X-Git-Url: http://git.ithinksw.org/~jspiros/python-ebml.git/blobdiff_plain/a0b38d57f9794bde5804be94c50e9671bea25833..495182be96c0580426942c5921be0d709e32b443:/ebml/core.py?ds=inline diff --git a/ebml/core.py b/ebml/core.py index d709268..8c01068 100644 --- a/ebml/core.py +++ b/ebml/core.py @@ -1,269 +1,168 @@ -import struct import datetime -from math import log -from .exceptions import * +import struct + +__all__ = ( + 'read_element_id', + 'read_element_size', + 'read_unsigned_integer', + 'read_signed_integer', + 'read_float', + 'read_string', + 'read_unicode_string', + 'read_date', + 'encode_element_id', + 'encode_element_size', + 'encode_unsigned_integer', + 'encode_signed_integer', + 'encode_float', + 'encode_string', + 'encode_unicode_string', + 'encode_date', +) -EBMLMaxSizeLength = 8 -EBMLMaxIDLength = 4 +MAXIMUM_ELEMENT_ID_LENGTH = 4 +MAXIMUM_ELEMENT_SIZE_LENGTH = 8 +MAXIMUM_UNSIGNED_INTEGER_LENGTH = 8 +MAXIMUM_SIGNED_INTEGER_LENGTH = 8 -def _read_vint_to_bytearray(stream, max_length=EBMLMaxSizeLength): + +def maximum_element_size_for_length(length): """ - Reads a vint from stream and returns a bytearray containing all of the bytes without doing any decoding. + Returns the maximum element size representable in a given number of bytes. - :arg stream: the source of the bytes - :type stream: a file-like object - :arg max_length: the maximum length, in bytes, of the vint (defaults to :data:`EBMLMaxSizeLength`) - :type max_length: int - :returns: bytearray + :arg length: the limit on the length of the encoded representation in bytes + :type length: int + :returns: the maximum element size representable + :rtype: int """ - marker_found = False - vint_bytes = bytearray() - vint_len = -7 - while not marker_found: - vint_len += 8 - if vint_len > max_length: - raise ParseError('vint length (%(vint_len)i) exceeds max_length (%(max_length)i)' % { - 'vint_len': vint_len, - 'max_length': max_length - }) - byte = ord(stream.read(1)) - vint_bytes.append(byte) - for pos in range(0, 8): - mask = 0b10000000 >> pos - if byte & mask: - vint_len += pos - marker_found = True - break - - remaining_bytes_len = vint_len - len(vint_bytes) - if remaining_bytes_len > 0: - vint_bytes.extend(ord(remaining_byte) for remaining_byte in stream.read(remaining_bytes_len)) - - if len(vint_bytes) != vint_len: - raise ParseError('Unable to read truncated vint of length %(vint_len)s from stream (%(vint_bytes)s bytes available)' % { - 'vint_len': vint_len, - 'vint_bytes': len(vint_bytes) - }) - - return vint_bytes + return (2**(7*length)) - 2 + +def decode_vint_length(byte, mask=True): + length = None + value_mask = None + for n in xrange(1, 9): + if byte & (2**8 - (2**(8 - n))) == 2**(8 - n): + length = n + value_mask = (2**(8 - n)) - 1 + break + if length is None: + raise IOError('Cannot decode invalid varible-length integer.') + if mask: + byte = byte & value_mask + return length, byte -def read_element_size(stream, max_length=EBMLMaxSizeLength): + +def read_element_id(stream): """ - Reads an EBML element size vint from stream and returns a tuple containing: - - * the size as an integer, or None if the size is undefined - * the length in bytes of the size descriptor (the vint) itself + Reads an element ID from a file-like object. - :arg stream: the source of the bytes - :type stream: a file-like object - :arg max_length: the maximum length, in bytes, of the vint storing the element size (defaults to :data:`EBMLMaxSizeLength`) - :type max_length: int - :returns: tuple + :arg stream: the file-like object + :returns: the decoded element ID and its length in bytes + :rtype: tuple """ - vint_bytes = _read_vint_to_bytearray(stream, max_length) - vint_len = len(vint_bytes) - - int_bytes = vint_bytes[((vint_len - 1) // 8):] - first_byte_mask = 0b10000000 >> ((vint_len - 1) % 8) - max_bytes = 0 - - value = int_bytes[0] & (first_byte_mask - 1) - - if value == (first_byte_mask - 1): - max_bytes += 1 - - for int_byte in int_bytes[1:]: - if int_byte == 0b11111111: - max_bytes += 1 - value = (value << 8) | int_byte - - if max_bytes == len(int_bytes): - value = None - - return value, vint_len + byte = ord(stream.read(1)) + length, id_ = decode_vint_length(byte, False) + if length > 4: + raise IOError('Cannot decode element ID with length > 8.') + for i in xrange(0, length - 1): + byte = ord(stream.read(1)) + id_ = (id_ * 2**8) + byte + return id_, length -def encode_element_size(size, min_length=None, max_length=EBMLMaxSizeLength): +def read_element_size(stream): """ - Encode the size of an EBML element as a vint, optionally with a minimum length. + Reads an element size from a file-like object. - :arg size: the element size, or None if undefined - :type size: int or None - :arg min_length: the minimum length, in bytes, of the resultant vint - :type min_length: int - :arg max_length: the maximum length, in bytes, of the vint storing the element size (defaults to :data:`EBMLMaxSizeLength`) - :type max_length: int - :returns: bytearray + :arg stream: the file-like object + :returns: the decoded size (or None if unknown) and the length of the descriptor in bytes + :rtype: tuple """ + byte = ord(stream.read(1)) + length, size = decode_vint_length(byte) - if size is not None: - size_bits = bin(size).lstrip('-0b') - size_bit_length = len(size_bits) - length_required = (abs(size_bit_length - 1) // 7) + 1 - if size_bit_length % 7 == 0 and '1' in size_bits and '0' not in size_bits: - length_required += 1 - length = max(length_required, min_length) - - alignment_bit_length = 0 - while ((length + alignment_bit_length + size_bit_length) // 8) < length: - alignment_bit_length += 1 - else: - length = min_length or 1 - required_bits = (length * 8) - length - size_bit_length = required_bits - size = (2**required_bits) - 1 - alignment_bit_length = 0 + for i in xrange(0, length - 1): + byte = ord(stream.read(1)) + size = (size * 2**8) + byte - if length > max_length: - raise ValueError('Unable to encode size (%i) with length %i (longer than limit of %i)' % (size, length, max_length)) + if size == maximum_element_size_for_length(length) + 1: + size = None - data = bytearray(length) - bytes_written = 0 - marker_written = False - while bytes_written < length: - index = (length - bytes_written) - 1 - if size: - data[index] = size & 0b11111111 - size = size >> 8 - if not size and not size_bit_length % 8 == 0: - if alignment_bit_length < (8 - (size_bit_length % 8)): - mask = 0b10000000 >> ((length - 1) % 8) - data[index] = data[index] | mask - alignment_bit_length = 0 - marker_written = True - else: - alignment_bit_length -= (8 - (size_bit_length % 8)) - bytes_written += 1 - else: - if alignment_bit_length: - if alignment_bit_length < 8: - data[index] = 0b10000000 >> ((length - 1) % 8) - alignment_bit_length = 0 - bytes_written += 1 - marker_written = True - else: - data[index] = 0b00000000 - alignment_bit_length -= 8 - bytes_written += 1 - else: - remaining_bytes = length - bytes_written - if not marker_written: - data[(remaining_bytes - 1)] = 0b00000001 - zero_range = range(0, (remaining_bytes - 1)) - else: - zero_range = range(0, remaining_bytes) - for index in zero_range: - data[index] = 0b00000000 - bytes_written += remaining_bytes - - return data + return size, length -def write_element_size(size, stream, min_length=None, max_length=EBMLMaxSizeLength): +def read_unsigned_integer(stream, size): """ - Write the size of an EBML element to stream, optionally with a minimum length. + Reads an encoded unsigned integer value from a file-like object. - :arg size: the element size, or None if undefined - :type size: int or None - :arg min_length: the minimum length, in bytes, to write - :type min_length: int - :arg max_length: the maximum length, in bytes, to write (defaults to :data:`EBMLMaxSizeLength`) - :type max_length: int - :returns: None + :arg stream: the file-like object + :arg size: the number of bytes to read and decode + :type size: int + :returns: the decoded unsigned integer value + :rtype: int """ - stream.write(encode_element_size(size, min_length, max_length)) + value = 0 + for i in xrange(0, size): + byte = ord(stream.read(1)) + value = (value << 8) | byte + return value -def read_element_id(stream, max_length=EBMLMaxIDLength): +def read_signed_integer(stream, size): """ - Reads an EBML element ID vint from stream and returns a tuple containing: + Reads an encoded signed integer value from a file-like object. - * the ID as an integer - * the length in bytes of the ID descriptor (the vint) itself - - :arg stream: the source of the bytes - :type stream: a file-like object - :arg max_length: the maximum length, in bytes, of the vint storing the element ID (defaults to :data:`EBMLMaxIDLength`) - :type max_length: int - :returns: tuple + :arg stream: the file-like object + :arg size: the number of bytes to read and decode + :type size: int + :returns: the decoded signed integer value + :rtype: int """ - vint_bytes = _read_vint_to_bytearray(stream, max_length) - vint_len = len(vint_bytes) - - value = 0 - - for vint_byte in vint_bytes: - value = (value << 8) | vint_byte - - return value, vint_len - - -# def encode_element_id(class_id, max_length=EBMLMaxIDLength): -# length = int(((log(class_id, 2) - 1) // 7) + 1) -# -# if length > max_length: -# raise ValueError('Unable to encode ID (%x) with length %i (longer than limit of %i)' % (class_id, length, max_length)) -# -# data = bytearray(length) -# -# bytes_written = 0 -# while bytes_written < length: -# data[(length - bytes_written) - 1] = class_id & 0b11111111 -# class_id >> 8 -# bytes_written += 1 -# -# return data -# -# -# def write_element_id(class_id, stream, max_length=EBMLMaxIDLength): -# stream.write(encode_element_id(class_id, max_length)) - - -def read_int(stream, size): value = 0 if size > 0: - byte = ord(stream.read(1)) - if (byte & 0b10000000) == 0b10000000: - value = -1 << 8 - value |= byte - for i in range(1, size): + first_byte = ord(stream.read(1)) + value = first_byte + for i in xrange(1, size): byte = ord(stream.read(1)) - value = (value << 1) | byte - return value - - -def read_uint(stream, size): - value = 0 - for i in range(0, size): - byte = ord(stream.read(1)) - value = (value << 8) | byte + value = (value << 8) | byte + if (first_byte & 0b10000000) == 0b10000000: + value = -(2**(size*8) - value) return value def read_float(stream, size): + """ + + Reads an encoded floating point value from a file-like object. + + :arg stream: the file-like object + :arg size: the number of bytes to read and decode (must be 0, 4, or 8) + :type size: int + :returns: the decoded floating point value + :rtype: float + + """ + if size not in (0, 4, 8): - # http://www.matroska.org/technical/specs/rfc/index.html allows for 10-byte floats. - # http://www.matroska.org/technical/specs/index.html specifies 4-byte and 8-byte only. - # I'm following the latter due to it being more up-to-date than the former, and because it's easier to implement. - raise ValueError('floats must be 0, 4, or 8 bytes long') + raise IOError('Cannot read floating point values with lengths other than 0, 4, or 8 bytes.') value = 0.0 if size in (4, 8): data = stream.read(size) @@ -275,27 +174,343 @@ def read_float(stream, size): def read_string(stream, size): + """ + + Reads an encoded ASCII string value from a file-like object. + + :arg stream: the file-like object + :arg size: the number of bytes to read and decode + :type size: int + :returns: the decoded ASCII string value + :rtype: str + + """ + value = '' if size > 0: value = stream.read(size) + value = value.partition(chr(0))[0] return value -def read_unicode(stream, size): +def read_unicode_string(stream, size): + """ + + Reads an encoded unicode string value from a file-like object. + + :arg stream: the file-like object + :arg size: the number of bytes to read and decode + :type size: int + :returns: the decoded unicode string value + :rtype: unicode + + """ + value = u'' if size > 0: data = stream.read(size) + data = data.partition(chr(0))[0] value = unicode(data, 'utf_8') return value -def read_date(stream): - size = 8 # date is always an 8-byte signed integer +def read_date(stream, size): + """ + + Reads an encoded date (and time) value from a file-like object. + + :arg stream: the file-like object + :arg size: the number of bytes to read and decode (must be 8) + :type size: int + :returns: the decoded date (and time) value + :rtype: datetime + + """ + + if size != 8: + raise IOError('Cannot read date values with lengths other than 8 bytes.') data = stream.read(size) nanoseconds = struct.unpack('>q', data)[0] delta = datetime.timedelta(microseconds=(nanoseconds // 1000)) - return datetime.datetime(2001, 1, 1) + delta + return datetime.datetime(2001, 1, 1, tzinfo=None) + delta + + +def octet(n): + """ + + Limits an integer or byte to 8 bits. + + """ + + return n & 0b11111111 + + +def vint_mask_for_length(length): + """ + + Returns the bitmask for the first byte of a variable-length integer (used for element ID and size descriptors). + + :arg length: the length of the variable-length integer + :type length: int + :returns: the bitmask for the first byte of the variable-length integer + :rtype: int + + """ + + return 0b10000000 >> (length - 1) + + +def encode_element_id(element_id): + """ + + Encodes an element ID. + + :arg element_id: an element ID + :type element_id: int + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + length = MAXIMUM_ELEMENT_ID_LENGTH + while length and not (element_id & (vint_mask_for_length(length) << ((length - 1) * 8))): + length -= 1 + if not length: + raise ValueError('Cannot encode invalid element ID %s.' % hex(element_id)) + + data = bytearray(length) + for index in reversed(xrange(length)): + data[index] = octet(element_id) + element_id >>= 8 + + return data + + +def encode_element_size(element_size, length=None): + """ + + Encodes an element size. If element_size is None, the size will be encoded as unknown. If length is not None, the size will be encoded in that many bytes; otherwise, the size will be encoded in the minimum number of bytes required, or in 8 bytes if the size is unknown (element_size is None). + + :arg element_size: the element size, or None if unknown + :type element_size: int or None + :arg length: the length of the encoded representation, or None for the minimum length required (defaults to None) + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if length is not None and (length < 1 or length > MAXIMUM_ELEMENT_SIZE_LENGTH): + raise ValueError('Cannot encode element sizes into representations shorter than one byte long or longer than %i bytes long.' % MAXIMUM_ELEMENT_SIZE_LENGTH) + if element_size is not None: + if element_size > maximum_element_size_for_length(MAXIMUM_ELEMENT_SIZE_LENGTH if length is None else length): + raise ValueError('Cannot encode element size %i as it would have an encoded representation longer than %i bytes.' % (element_size, (MAXIMUM_ELEMENT_SIZE_LENGTH if length is None else length))) + req_length = 1 + while (element_size >> ((req_length - 1) * 8)) >= (vint_mask_for_length(req_length) - 1) and req_length < MAXIMUM_ELEMENT_SIZE_LENGTH: + req_length += 1 + if length is None: + length = req_length + else: + if length is None: + length = 8 # other libraries do this, so unless another length is specified for the unknown size descriptor, do as they do to avoid compatibility issues. + element_size = maximum_element_size_for_length(length) + 1 + + data = bytearray(length) + for index in reversed(xrange(length)): + data[index] = octet(element_size) + element_size >>= 8 + if not index: + data[index] = data[index] | vint_mask_for_length(length) + + return data -def read_binary(stream, size): - return stream.read(size) \ No newline at end of file +def encode_unsigned_integer(uint, length=None): + """ + + Encodes an unsigned integer value. If length is not None, uint will be encoded in that many bytes; otherwise, uint will be encoded in the minimum number of bytes required. If uint is None or 0, the minimum number of bytes required is 0. + + :arg uint: the unsigned integer value + :type uint: int + :arg length: the length of the encoded representation, or None for the minimum length required (defaults to None) + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if uint is None: + uint = 0 + if uint > ((2**((MAXIMUM_UNSIGNED_INTEGER_LENGTH if length is None else length) * 8)) - 1): + raise ValueError('Cannot encode unsigned integer value %i as it would have an encoded representation longer than %i bytes.' % (uint, (MAXIMUM_UNSIGNED_INTEGER_LENGTH if length is None else length))) + elif uint == 0: + req_length = 0 + else: + req_length = 1 + while uint >= (1 << (req_length * 8)) and req_length < MAXIMUM_UNSIGNED_INTEGER_LENGTH: + req_length += 1 + if length is None: + length = req_length + + data = bytearray(length) + for index in reversed(xrange(length)): + data[index] = octet(uint) + uint >>= 8 + + return data + + +def encode_signed_integer(sint, length=None): + """ + + Encodes a signed integer value. If length is not None, sint will be encoded in that many bytes; otherwise, sint will be encoded in the minimum number of bytes required. If sint is None or 0, the minimum number of bytes required is 0. + + :arg sint: the signed integer value + :type sint: int + :arg length: the length of the encoded representation, or None for the minimum length required (defaults to None) + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if sint is None: + sint = 0 + if not (-(2**(7+(8*((MAXIMUM_SIGNED_INTEGER_LENGTH if length is None else length)-1)))) <= sint <= (2**(7+(8*((MAXIMUM_SIGNED_INTEGER_LENGTH if length is None else length)-1))))-1): + raise ValueError('Cannot encode signed integer value %i as it would have an encoded representation longer than %i bytes.' % (sint, (MAXIMUM_SIGNED_INTEGER_LENGTH if length is None else length))) + elif sint == 0: + req_length = 0 + uint = 0 + if length is None: + length = req_length + else: + uint = ((-sint - 1) << 1) if sint < 0 else (sint << 1) + req_length = 1 + while uint >= (1 << (req_length * 8)) and req_length < MAXIMUM_UNSIGNED_INTEGER_LENGTH: + req_length += 1 + if length is None: + length = req_length + if sint >= 0: + uint = sint + else: + uint = 2**(length*8) - abs(sint) + + data = bytearray(length) + for index in reversed(xrange(length)): + data[index] = octet(uint) + uint >>= 8 + + return data + + +def encode_float(float_, length=None): + """ + + Encodes a floating point value. If length is not None, float_ will be encoded in that many bytes; otherwise, float_ will be encoded in 0 bytes if float_ is None or 0, and 8 bytes in all other cases. If float_ is not None or 0 and length is 0, ValueError will be raised. + + :arg float_: the floating point value + :type float_: float + :arg length: the length of the encoded representation, or None (defaults to None) + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if length not in (None, 0, 4, 8): + raise ValueError('Cannot encode floating point values with lengths other than 0, 4, or 8 bytes.') + if float_ is None: + float_ = 0.0 + if float_ == 0.0: + if length is None: + length = 0 + else: + if length is None: + length = 8 + elif length == 0: + raise ValueError('Cannot encode floating point value %f as it would have an encoded representation longer than 0 bytes.' % float_) + + if length in (4, 8): + data = bytearray(struct.pack({ + 4: '>f', + 8: '>d' + }[length], float_)) + else: + data = bytearray() + + return data + + +def encode_string(string, length=None): + """ + + Encodes an ASCII string value. If length is not None, string will be encoded in that many bytes by padding with zero bytes at the end if necessary; otherwise, string will be encoded in the minimum number of bytes required. If string is None or empty, the minimum number of bytes required is 0. + + :arg string: the ASCII string value + :type string: str + :arg length: the length of the encoded representation, or None for the minimum length required (defaults to None) + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if string is None: + string = '' + if length is None: + length = len(string) + else: + if length < len(string): + raise ValueError('Cannot encode ASCII string value \'%s\' as it would have an encoded representation longer than %i bytes.' % (string, length)) + elif length > len(string): + for i in xrange(0, (length - len(string))): + string += chr(0) + + return bytearray(string) + + +def encode_unicode_string(string, length=None): + """ + + Encodes a unicode string value. If length is not None, string will be encoded in that many bytes by padding with zero bytes at the end if necessary; otherwise, string will be encoded in the minimum number of bytes required. If string is None or empty, the minimum number of bytes required is 0. + + :arg string: the unicode string value + :type string: unicode + :arg length: the length of the encoded representation, or None for the minimum length required (defaults to None) + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if string is None: + string = u'' + return encode_string(string.encode('utf_8'), length) + + +def encode_date(date, length=None): + """ + + Encodes a date (and time) value. If length is not None, it must be 8. If date is None, the current date (and time) will be encoded. + + :arg date: the date (and time) value + :type date: datetime.datettime + :arg length: the length of the encoded representation (must be 8), or None + :type length: int or None + :returns: the encoded representation bytes + :rtype: bytearray + + """ + + if date is None: + date = datetime.datetime.utcnow() + else: + date = (date - date.utcoffset()).replace(tzinfo=None) + if length is None: + length = 8 + elif length != 8: + raise ValueError('Cannot encode date value %s with any length other than 8 bytes.') + + delta = date - datetime.datetime(2001, 1, 1, tzinfo=None) + nanoseconds = (delta.microseconds + ((delta.seconds + (delta.days * 24 * 60 * 60)) * 10**6)) * 10**3 + return encode_signed_integer(nanoseconds, length)