-from .files import *
\ No newline at end of file
+from .core import *
+from .schema import *
\ No newline at end of file
+++ /dev/null
-from .schema import *
-from .core import *
-
-
-__all__ = ('EBMLFile', 'MatroskaFile')
-
-
-TYPE_READERS = {
- INT: read_signed_integer,
- UINT: read_unsigned_integer,
- FLOAT: read_float,
- STRING: read_string,
- UNICODE: read_unicode_string,
- DATE: read_date
-}
-
-
-class EBMLFileElement(object):
- def __init__(self, stream, schema, parent=None):
- self.stream = stream
- self.schema = schema
- self.parent = parent
- self.class_id, self.class_id_len = read_element_id(self.stream)
- try:
- self.element = schema.element_with_class_id(self.class_id)
- except:
- self.element = None
- else:
- if self.parent is None:
- if not self.element in self.schema.root_elements():
- self.element = None
- else:
- if not self.element in self.schema.child_elements_of_element(self.parent):
- self.element = None
- self.size, self.size_len = read_element_size(self.stream)
- self.offset = self.stream.tell()
- self._read_contents()
-
- def _read_contents(self):
- contents = None
- if self.element is not None:
- if self.element.data_type in TYPE_READERS:
- contents = TYPE_READERS[self.element.data_type](self.stream, self.size)
- elif self.element.data_type == CONTAINER:
- read_len = 0
- contents = []
- while self.size > read_len:
- sub_el = EBMLFileElement(self.stream, self.schema, self.element)
- read_len += (sub_el.class_id_len + sub_el.size_len + sub_el.size)
- contents.append(sub_el)
- else:
- self.stream.seek(self.offset + self.size, 0)
- else:
- self.stream.seek(self.offset + self.size, 0)
- self.contents = contents
-
- def pprint(self, indent=0):
- sargs = {
- 'class_name': self.element.class_name or 'Unknown',
- 'class_id': self.class_id,
- 'size': self.size,
- 'value': self.contents or None
- }
- def pprint_(foo):
- print ('\t' * indent) + foo
- if not self.contents:
- pprint_('<%(class_name)s id=\'%(class_id)x\' size=\'%(size)i\' />' % sargs)
- else:
- if self.element.data_type == CONTAINER:
- pprint_('<%(class_name)s id=\'%(class_id)x\' size=\'%(size)i\'>' % sargs)
- for sub_el in self.contents:
- sub_el.pprint(indent + 1)
- pprint_('</%(class_name)s>' % sargs)
- else:
- pprint_('<%(class_name)s id=\'%(class_id)x\' size=\'%(size)i\'>%(value)s</%(class_name)s>' % sargs)
-
- def __repr__(self):
- return '<%(class_name)s id=%(class_id)x size=%(size)i>' % {
- 'class_name': self.element.class_name or '?',
- 'class_id': self.element.class_id or self.class_id,
- 'size': self.size
- }
-
-
-class EBMLFile(object):
- default_schema = EBML
-
- def __init__(self, name_or_stream, schema=None):
- if schema is None:
- schema = self.default_schema
- self.schema = schema
-
- if isinstance(name_or_stream, basestring):
- self.stream = open(name_or_stream, 'rb')
- else:
- self.stream = name_or_stream
-
- self._read_contents()
-
- def _read_contents(self):
- self.contents = []
- while True:
- try:
- self.contents.append(EBMLFileElement(self.stream, self.schema, None))
- except:
- break
-
- def pprint(self):
- for el in self.contents:
- el.pprint()
-
-
-class MatroskaFile(EBMLFile):
- default_schema = Matroska
\ No newline at end of file
from .base import *
-from .matroska import Matroska
\ No newline at end of file
+from .ebml import EBMLDocument
+from .matroska import MatroskaDocument
\ No newline at end of file
-__all__ = ('INT', 'UINT', 'FLOAT', 'STRING', 'UNICODE', 'DATE', 'BINARY', 'CONTAINER', 'Element', 'EBML')
+import abc
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+from ..core import *
-INT, UINT, FLOAT, STRING, UNICODE, DATE, BINARY, CONTAINER = range(0, 8)
-
-
-class Element(object):
- class_id = None
- class_name = 'Unknown'
- class_parents = ()
- class_global = False
- class_root = False
- data_type = BINARY
-
-
-class EBMLElement(Element):
- class_id = 0x1A45DFA3
- class_name = 'EBML'
- class_root = True
- data_type = CONTAINER
-
-
-class EBMLVersionElement(Element):
- class_id = 0x4286
- class_name = 'EBMLVersion'
- class_parents = (EBMLElement,)
- data_type = UINT
-
-
-class EBMLReadVersionElement(Element):
- class_id = 0x42F7
- class_name = 'EBMLReadVersion'
- class_parents = (EBMLElement,)
- data_type = UINT
-
-
-class EBMLMaxIDLengthElement(Element):
- class_id = 0x42F2
- class_name = 'EBMLMaxIDLength'
- class_parents = (EBMLElement,)
- data_type = UINT
-
-
-class EBMLMaxSizeLengthElement(Element):
- class_id = 0x42F3
- class_name = 'EBMLMaxSizeLength'
- class_parents = (EBMLElement,)
- data_type = UINT
-
-
-class DocTypeElement(Element):
- class_id = 0x4282
- class_name = 'DocType'
- class_parents = (EBMLElement,)
- data_type = STRING
-
-
-class DocTypeVersionElement(Element):
- class_id = 0x4287
- class_name = 'DocTypeVersion'
- class_parents = (EBMLElement,)
- data_type = UINT
-
-
-class DocTypeReadVersionElement(Element):
- class_id = 0x4285
- class_name = 'DocTypeReadVersion'
- class_parents = (EBMLElement,)
- data_type = UINT
-
-
-class CRC32Element(Element):
- class_id = 0xBF
- class_name = 'CRC-32'
- class_global = True
- data_type = BINARY
-
-
-class VoidElement(Element):
- class_id = 0xEC
- class_name = 'Void'
- class_global = True
- data_type = BINARY
-
-
-class SignatureSlotElement(Element):
- class_id = 0x1B538667
- class_name = 'SignatureSlot'
- class_global = True
- data_type = CONTAINER
-
-
-class SignatureAlgoElement(Element):
- class_id = 0x7E8A
- class_name = 'SignatureAlgo'
- class_parents = (SignatureSlotElement,)
- data_type = UINT
+__all__ = ('UnknownElement', 'Element', 'Document', 'INT', 'UINT', 'FLOAT', 'STRING', 'UNICODE', 'DATE', 'BINARY', 'CONTAINER')
-class SignatureHashElement(Element):
- class_id = 0x7E9A
- class_name = 'SignatureHash'
- class_parents = (SignatureSlotElement,)
- data_type = UINT
-
-
-class SignaturePublicKeyElement(Element):
- class_id = 0x7EA5
- class_name = 'SignaturePublicKey'
- class_parents = (SignatureSlotElement,)
- data_type = BINARY
-
-
-class SignatureElement(Element):
- class_id = 0x7EB5
- class_name = 'Signature'
- class_parents = (SignatureSlotElement,)
- data_type = BINARY
-
-
-class SignatureElementsElement(Element):
- class_id = 0x7E5B
- class_name = 'SignatureElements'
- class_parents = (SignatureSlotElement,)
- data_type = CONTAINER
-
-
-class SignatureElementListElement(Element):
- class_id = 0x7E7B
- class_name = 'SignatureElementList'
- class_parents = (SignatureElementsElement,)
- data_type = CONTAINER
-
-
-class SignedElementElement(Element):
- class_id = 0x6532
- class_name = 'SignedElement'
- class_parents = (SignatureElementListElement,)
- data_type = BINARY
+INT, UINT, FLOAT, STRING, UNICODE, DATE, BINARY, CONTAINER = range(0, 8)
-class Schema(object):
- doc_type = None
- version = None
- elements_by_class_id = None
+READERS = {
+ INT: read_signed_integer,
+ UINT: read_unsigned_integer,
+ FLOAT: read_float,
+ STRING: read_string,
+ UNICODE: read_unicode_string,
+ DATE: read_date,
+ BINARY: lambda stream, size: bytearray(stream.read(size))
+}
+
+
+ENCODERS = {
+ INT: encode_signed_integer,
+ UINT: encode_unsigned_integer,
+ FLOAT: encode_float,
+ STRING: encode_string,
+ UNICODE: encode_unicode_string,
+ DATE: encode_date,
+ BINARY: lambda binary, length: binary
+}
+
+
+VALIDATORS = {
+ INT: lambda value: True if isinstance(value, (int, long)) else False,
+ UINT: lambda value: True if isinstance(value, (int, long)) and value == abs(value) else False,
+ FLOAT: lambda value: True if isinstance(value, float) else False,
+ STRING: lambda value: True if isinstance(value, str) else False,
+ UNICODE: lambda value: True if isinstance(value, basestring) else False,
+ DATE: lambda value: True if isinstance(value, datetime.datetime) else False,
+ BINARY: lambda value: True if isinstance(value, (str, bytes, bytearray)) else False
+}
+
+
+class BaseElement(object):
+ __metaclass__ = abc.ABCMeta
- @classmethod
- def element_with_class_id(cls, class_id):
- if cls.elements_by_class_id is None:
- cls.elements_by_class_id = {}
- for element in cls.elements:
- cls.elements_by_class_id[element.class_id] = element
- return cls.elements_by_class_id[class_id]
+ id = abc.abstractproperty()
+ name = abc.abstractproperty()
+ type = abc.abstractproperty()
+ default = None
+ children = ()
+ mandatory = False
+ multiple = False
+
+
+class UnknownElement(BaseElement):
+ id = None
+ name = 'Unknown'
+ type = BINARY
+ def __init__(self, id, encoding):
+ self.id = id
+ self.encoding = encoding
+
+
+def read_elements(stream, size, document, children):
+ elements = []
+ while (size if size is not None else True):
+ try:
+ element_id, element_id_size = read_element_id(stream)
+ element_size, element_size_size = read_element_size(stream)
+ element_encoding = (element_size, bytearray(stream.read(element_size)))
+ except:
+ break
+ else:
+ element_class = None
+ for child in (children + document.globals):
+ if child.id == element_id:
+ element_class = child
+ break
+ if element_class is None:
+ element = UnknownElement(element_id, element_encoding)
+ else:
+ element = element_class(document, encoding=element_encoding)
+ elements.append(element)
+ if size is not None:
+ size -= element_id_size + element_size_size + element_size
+ return elements
+
+
+class Element(BaseElement):
@classmethod
- def global_elements(cls):
- return [element for element in cls.elements if element.class_global]
+ def check_value(cls, value):
+ if cls.type in VALIDATORS:
+ return VALIDATORS[cls.type](value)
+ elif cls.type == CONTAINER:
+ if isinstance(value, (list, tuple)):
+ for item in value:
+ if not isinstance(value, Element):
+ return False
+ return True
+ elif isinstance(value, Element):
+ return True
+ else:
+ return False
+ else:
+ raise NotImplementedError('Unsupported element type.')
- @classmethod
- def root_elements(cls):
- return [element for element in cls.elements if element.class_root]
+ def __init__(self, document, value=None, encoding=None):
+ self.document = document
+ self._value = value
+ self._encoding = encoding
- @classmethod
- def child_elements_of_element(cls, parent):
- children = [element for element in cls.elements if parent in element.class_parents]
- children += cls.global_elements()
- if 'self' in parent.class_parents and parent not in children:
- children.append(parent)
- return children
-
-
-class EBML(Schema):
- elements = (
- EBMLElement,
- EBMLVersionElement,
- EBMLReadVersionElement,
- EBMLMaxIDLengthElement,
- EBMLMaxSizeLengthElement,
- DocTypeElement,
- DocTypeVersionElement,
- DocTypeReadVersionElement,
- CRC32Element,
- VoidElement,
- SignatureSlotElement,
- SignatureAlgoElement,
- SignatureHashElement,
- SignaturePublicKeyElement,
- SignatureElement,
- SignatureElementsElement,
- SignatureElementListElement,
- SignedElementElement
- )
\ No newline at end of file
+ @property
+ def value(self):
+ if self._value is None and self._encoding is not None:
+ if self.type in READERS:
+ self._value = READERS[self.type](StringIO(self._encoding[1]), self._encoding[0])
+ elif self.type == CONTAINER:
+ self._value = read_elements(StringIO(self._encoding[1]), self._encoding[0], self.document, self.children)
+ return self._value
+
+ @value.setter
+ def set_value(self, value):
+ if not self.check_value(value):
+ raise ValueError('Unsupported element value.')
+ self._value = value
+ self._encoding = None
+
+ @property
+ def encoding(self):
+ if self._encoding is None:
+ size = 0
+ data = bytearray()
+ if self._value is not None:
+ if self.type in ENCODERS:
+ data = ENCODERS[self.type](self._value)
+ size = len(data)
+ elif self.type == CONTAINER:
+ for element in self._value:
+ size += element.size
+ data.extend(element.encoding[1])
+ self._encoding = (size, data)
+ return self._encoding
+
+ @property
+ def id_size(self):
+ return len(encode_element_id(self.id))
+
+ @property
+ def size_size(self):
+ return len(encode_element_size(self.body_size))
+
+ @property
+ def head_size(self):
+ return self.id_size + self.size_size
+
+ @property
+ def body_size(self):
+ return self.encoding[0]
+
+ @property
+ def size(self):
+ return self.head_size + self.body_size
+
+
+class Document(object):
+ __metaclass__ = abc.ABCMeta
+
+ type = abc.abstractproperty()
+ version = abc.abstractproperty()
+ children = ()
+ globals = ()
+
+ def __init__(self, stream):
+ self.stream = stream
+ self._roots = None
+
+ @property
+ def roots(self):
+ if self._roots is None:
+ self._roots = read_elements(self.stream, None, self, self.children)
+ return self._roots
\ No newline at end of file
--- /dev/null
+from .base import *
+
+
+class CRC32Element(Element):
+ id = 0xBF
+ name = 'CRC-32'
+ type = BINARY
+
+
+class VoidElement(Element):
+ id = 0xEC
+ name = 'Void'
+ type = BINARY
+
+
+class SignatureAlgoElement(Element):
+ id = 0x7E8A
+ name = 'SignatureAlgo'
+ type = UINT
+ multiple = True
+
+
+class SignatureHashElement(Element):
+ id = 0x7E9A
+ name = 'SignatureHash'
+ type = UINT
+
+
+class SignaturePublicKeyElement(Element):
+ id = 0x7EA5
+ name = 'SignaturePublicKey'
+ type = BINARY
+
+
+class SignatureElement(Element):
+ id = 0x7EB5
+ name = 'Signature'
+ type = BINARY
+
+
+class SignedElementElement(Element):
+ id = 0x6532
+ name = 'SignedElement'
+ type = BINARY
+
+
+class SignatureElementListElement(Element):
+ id = 0x7E7B
+ name = 'SignatureElementList'
+ children = (SignedElementElement,)
+ type = CONTAINER
+ multiple = True
+
+
+class SignatureElementsElement(Element):
+ id = 0x7E5B
+ name = 'SignatureElements'
+ children = (SignatureElementListElement)
+ type = CONTAINER
+
+
+class SignatureSlotElement(Element):
+ id = 0x1B538667
+ name = 'SignatureSlot'
+ children = (SignatureAlgoElement, SignatureHashElement, SignaturePublicKeyElement, SignatureElement, SignatureElementsElement)
+ type = CONTAINER
+
+
+class EBMLVersionElement(Element):
+ id = 0x4286
+ name = 'EBMLVersion'
+ type = UINT
+ mandatory = True
+ default = 1
+
+
+class EBMLReadVersionElement(Element):
+ id = 0x42F7
+ name = 'EBMLReadVersion'
+ type = UINT
+ mandatory = True
+ default = 1
+
+
+class EBMLMaxIDLengthElement(Element):
+ id = 0x42F2
+ name = 'EBMLMaxIDLength'
+ type = UINT
+ mandatory = True
+ default = 4
+
+
+class EBMLMaxSizeLengthElement(Element):
+ id = 0x42F3
+ name = 'EBMLMaxSizeLength'
+ type = UINT
+ mandatory = True
+ default = 8
+
+
+class DocTypeElement(Element):
+ id = 0x4282
+ name = 'DocType'
+ type = STRING
+ mandatory = True
+
+
+class DocTypeVersionElement(Element):
+ id = 0x4287
+ name = 'DocTypeVersion'
+ type = UINT
+ mandatory = True
+
+
+class DocTypeReadVersionElement(Element):
+ id = 0x4285
+ name = 'DocTypeReadVersion'
+ type = UINT
+ mandatory = True
+
+
+class EBMLElement(Element):
+ id = 0x1A45DFA3
+ name = 'EBML'
+ type = CONTAINER
+ children = (EBMLVersionElement, EBMLReadVersionElement, EBMLMaxIDLengthElement, EBMLMaxSizeLengthElement, DocTypeElement, DocTypeVersionElement, DocTypeReadVersionElement)
+ mandatory = True
+ multiple = True
+
+
+class EBMLDocument(Document):
+ children = (EBMLElement,)
+ globals = (CRC32Element, VoidElement, SignatureSlotElement)
\ No newline at end of file
from .specs import parse_specdata
-_Elements, Matroska = parse_specdata(os.path.join(os.path.dirname(__file__), 'matroska.xml'), 'Matroska')
+_Elements, MatroskaDocument = parse_specdata(os.path.join(os.path.dirname(__file__), 'matroska.xml'), 'MatroskaDocument', 'matroska', 1)
for name, element in _Elements.iteritems():
from xml.etree.ElementTree import parse as parse_xml
-from .base import INT, UINT, FLOAT, STRING, UNICODE, DATE, BINARY, CONTAINER, Element, Schema
+from .base import INT, UINT, FLOAT, STRING, UNICODE, DATE, BINARY, CONTAINER, Element, Document
SPECDATA_TYPES = {
}
-def parse_specdata(source, schema_name):
+def parse_specdata(source, doc_name, doc_type, doc_version):
"""
Reads a schema specification from a file (e.g., specdata.xml) or file-like object, and returns a tuple containing:
* a mapping of class names to Element subclasses
- * a Schema subclass
+ * a Document subclass
:arg source: the file or file-like object
:type source: str or file-like object
tree = parse_xml(source)
elements = {}
- parent_elements = []
+ globals = []
- for element_element in tree.getiterator('element'):
- raw_attrs = element_element.attrib
-
- element_name = '%sElement' % raw_attrs.get('cppname', raw_attrs.get('name'))
- element_level = int(raw_attrs['level'])
- element_attrs = {
- '__module__': None,
- 'class_id': int(raw_attrs['id'], 0),
- 'class_name': raw_attrs['name'],
- 'data_type': SPECDATA_TYPES[raw_attrs['type']]
- }
-
- while parent_elements and element_level <= parent_elements[-1][0]:
- parent_elements.pop()
-
- if element_level == -1:
- element_attrs['class_global'] = True
- parent_elements = []
- elif element_level == 0:
- element_attrs['class_root'] = True
- parent_elements = []
- else:
- if raw_attrs.get('recursive', '0') == '1':
- element_attrs['class_parents'] = (parent_elements[-1][1], 'self')
+ def child_elements(parent_level, element_list):
+ children = []
+ while element_list:
+ raw_element = element_list[0]
+ raw_attrs = raw_element.attrib
+
+ element_level = int(raw_attrs['level'])
+
+ is_global = False
+ if element_level == -1:
+ is_global = True
+ elif parent_level is not None and not element_level > parent_level:
+ break
+ element_list = element_list[1:]
+
+ element_name = '%sElement' % raw_attrs.get('cppname', raw_attrs.get('name')).translate(None, '-')
+ element_attrs = {
+ '__module__': None,
+ 'id': int(raw_attrs['id'], 0),
+ 'name': raw_attrs['name'],
+ 'type': SPECDATA_TYPES[raw_attrs['type']],
+ 'mandatory': True if raw_attrs.get('mandatory', False) == '1' else False,
+ 'multiple': True if raw_attrs.get('multiple', False) == '1' else False
+ }
+ try:
+ element_attrs['default'] = {
+ INT: lambda default: int(default),
+ UINT: lambda default: int(default),
+ FLOAT: lambda default: float(default),
+ STRING: lambda default: str(default),
+ UNICODE: lambda default: unicode(default)
+ }.get(element_attrs['type'], lambda default: default)(raw_attrs['default'])
+ except (KeyError, ValueError):
+ element_attrs['default'] = None
+
+ element_attrs['children'], element_list = child_elements(element_level if not is_global else 0, element_list)
+
+ element = type(element_name, (Element,), element_attrs)
+ elements[element_name] = element
+ if is_global:
+ globals.append(element)
else:
- element_attrs['class_parents'] = (parent_elements[-1][1],)
-
- element = type(element_name, (Element,), element_attrs)
- elements[element_name] = element
- parent_elements.append((element_level, element))
+ children.append(element)
+ return tuple(children), element_list
+
+ children = child_elements(None, tree.getroot().getchildren())[0]
- schema_attrs = {
+ document_attrs = {
'__module__': None,
- 'elements': tuple(elements.values())
+ 'type': doc_type,
+ 'version': doc_version,
+ 'children': children,
+ 'globals': tuple(globals)
}
- schema = type(schema_name, (Schema,), schema_attrs)
+ document = type(doc_name, (Document,), document_attrs)
- return elements, schema
\ No newline at end of file
+ return elements, document
\ No newline at end of file
--- /dev/null
+from ..schema import EBMLDocument, UnknownElement, CONTAINER, BINARY
+
+
+def dump_element(element, indent=0):
+ if isinstance(element, UnknownElement):
+ print(('\t' * indent) + ('<Unknown id=\'%s\' bytes=\'%i\' />' % (hex(element.id), element.encoding[0])))
+ else:
+ sargs = {
+ 'name': element.name,
+ 'bytes': element.body_size,
+ 'value': element.value
+ }
+ def print_indented(foo):
+ print(('\t' * indent) + foo)
+ if element.type == CONTAINER:
+ print_indented('<%(name)s>' % sargs)
+ for sub_el in element.value:
+ dump_element(sub_el, indent + 1)
+ print_indented('</%(name)s>' % sargs)
+ elif element.type == BINARY:
+ print_indented('<%(name)s bytes=\'%(bytes)i\' />' % sargs)
+ else:
+ print_indented('<%(name)s>%(value)s</%(name)s>' % sargs)
+
+
+def dump_document(document):
+ for el in document.roots:
+ dump_element(el)
+
+
+if __name__ == '__main__':
+ import sys
+ from optparse import OptionParser
+
+ parser = OptionParser(usage='Usage: %prog [OPTION] FILE')
+ parser.add_option('--document-class', dest='document_class', help='the document class to use', metavar='CLASS')
+ options, args = parser.parse_args()
+
+ if options.document_class is None:
+ class doc_cls(EBMLDocument):
+ type = None
+ version = None
+ else:
+ mod_name, _, cls_name = options.document_class.rpartition('.')
+ try:
+ doc_mod = __import__(mod_name, fromlist=[cls_name])
+ doc_cls = getattr(doc_mod, cls_name)
+ except ImportError:
+ parser.error('unable to import module %s' % mod_name)
+ except AttributeError:
+ parser.error('unable to import class %s from %s' % (cls_name, mod_name))
+
+ if not args:
+ parser.error('no file provided')
+ elif len(args) > 1:
+ parser.error('more than one file provided')
+
+ with open(args[0], 'rb') as stream:
+ doc = doc_cls(stream)
+ dump_document(doc)
\ No newline at end of file