Changeset 322 for pyyaml/trunk
- Timestamp:
- 12/28/08 15:16:50 (3 years ago)
- Location:
- pyyaml/trunk
- Files:
-
- 2 added
- 1 removed
- 31 modified
-
MANIFEST.in (modified) (1 diff)
-
Makefile (modified) (1 diff)
-
setup.cfg (modified) (1 diff)
-
setup.py (modified) (2 diffs)
-
tests/canonical.py (added)
-
tests/data/construct-python-name-module.code (modified) (1 diff)
-
tests/data/empty-document-bug.empty (added)
-
tests/data/invalid-character.stream-error (modified) (1 diff)
-
tests/data/invalid-utf8-byte.stream-error (modified) (1 diff)
-
tests/data/odd-utf16.stream-error (modified) (previous)
-
tests/data/serializer-is-already-opened.dumper-error (modified) (1 diff)
-
tests/data/serializer-is-closed-1.dumper-error (modified) (1 diff)
-
tests/data/serializer-is-closed-2.dumper-error (modified) (1 diff)
-
tests/data/serializer-is-not-opened-1.dumper-error (modified) (1 diff)
-
tests/data/serializer-is-not-opened-2.dumper-error (modified) (1 diff)
-
tests/data/unknown.dumper-error (modified) (1 diff)
-
tests/test_all.py (modified) (1 diff)
-
tests/test_appliance.py (modified) (1 diff)
-
tests/test_build.py (modified) (2 diffs)
-
tests/test_build_ext.py (modified) (2 diffs)
-
tests/test_canonical.py (modified) (1 diff)
-
tests/test_constructor.py (modified) (3 diffs)
-
tests/test_emitter.py (modified) (2 diffs)
-
tests/test_errors.py (modified) (1 diff)
-
tests/test_mark.py (modified) (1 diff)
-
tests/test_reader.py (modified) (1 diff)
-
tests/test_recursive.py (modified) (3 diffs)
-
tests/test_representer.py (modified) (1 diff)
-
tests/test_resolver.py (modified) (1 diff)
-
tests/test_structure.py (modified) (1 diff)
-
tests/test_syck.py (deleted)
-
tests/test_tokens.py (modified) (1 diff)
-
tests/test_yaml.py (modified) (2 diffs)
-
tests/test_yaml_ext.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
pyyaml/trunk/MANIFEST.in
r295 r322 1 1 include README LICENSE setup.py 2 2 recursive-include examples *.py *.cfg *.yaml 3 #recursive-include tests *.py4 #recursive-include tests/data *3 recursive-include tests *.py 4 recursive-include tests/data * -
pyyaml/trunk/Makefile
r295 r322 30 30 ${PYTHON} tests/test_build_ext.py ${TEST} 31 31 32 testall: 33 ${PYTHON} setup.py test 34 32 35 dist: 33 36 ${PYTHON} setup.py --with-libyaml sdist --formats=zip,gztar -
pyyaml/trunk/setup.cfg
r287 r322 18 18 # The following options are used to build PyYAML Windows installer 19 19 # for Python 2.3 on my PC: 20 #include_dirs=../../ libyaml/branches/stable/include21 #library_dirs=../../ libyaml/branches/stable/win32/vc6/output/release/lib20 #include_dirs=../../../libyaml/tags/0.1.2/include 21 #library_dirs=../../../libyaml/tags/0.1.2/win32/vc6/output/release/lib 22 22 #define=YAML_DECLARE_STATIC 23 23 24 24 # The following options are used to build PyYAML Windows installer 25 25 # for Python 2.4 and Python 2.5 on my PC: 26 #include_dirs=../../ libyaml/branches/stable/include27 #library_dirs=../../ libyaml/branches/stable/win32/vs2003/output/release/lib26 #include_dirs=../../../libyaml/tags/0.1.2/include 27 #library_dirs=../../../libyaml/tags/0.1.2/win32/vs2003/output/release/lib 28 28 #define=YAML_DECLARE_STATIC 29 29 30 30 # The following options are used to build PyYAML Windows installer 31 31 # for Python 2.6 on my PC: 32 #include_dirs=../../ libyaml/branches/stable/include33 #library_dirs=../../ libyaml/branches/stable/win32/vs2008/output/release/lib32 #include_dirs=../../../libyaml/tags/0.1.2/include 33 #library_dirs=../../../libyaml/tags/0.1.2/win32/vs2008/output/release/lib 34 34 #define=YAML_DECLARE_STATIC 35 -
pyyaml/trunk/setup.py
r314 r322 270 270 271 271 272 class test(Command): 273 274 user_options = [] 275 276 def initialize_options(self): 277 pass 278 279 def finalize_options(self): 280 pass 281 282 def run(self): 283 build_cmd = self.get_finalized_command('build') 284 build_cmd.run() 285 sys.path.insert(0, build_cmd.build_lib) 286 sys.path.insert(0, 'tests') 287 import test_all 288 test_all.main([]) 289 290 272 291 if __name__ == '__main__': 273 292 … … 297 316 'build_ext': build_ext, 298 317 'bdist_rpm': bdist_rpm, 318 'test': test, 299 319 }, 300 320 ) -
pyyaml/trunk/tests/data/construct-python-name-module.code
r146 r322 1 [file, Loader,dump, abs, yaml.tokens]1 [file, yaml.Loader, yaml.dump, abs, yaml.tokens] -
pyyaml/trunk/tests/data/invalid-character.stream-error
r45 r322 1 ------------------------------------------------------------------------------------------------------------------------------- 2 ------------------------------------------------------------------------------------------------------------------------------- 3 ------------------------------------------------------------------------------------------------------------------------------- 4 ------------------------------------------------------------------------------------------------------------------------------- 5 ------------------------------------------------------------------------------------------------------------------------------- 6 ------------------------------------------------------------------------------------------------------------------------------- 7 ------------------------------------------------------------------------------------------------------------------------------- 8 ------------------------------------------------------------------------------------------------------------------------------- 9 ------------------------------------------------------------------------------------------------------------------------------- 10 ------------------------------------------------------------------------------------------------------------------------------- 11 ------------------------------------------------------------------------------------------------------------------------------- 12 ------------------------------------------------------------------------------------------------------------------------------- 13 ------------------------------------------------------------------------------------------------------------------------------- 14 ------------------------------------------------------------------------------------------------------------------------------- 15 ------------------------------------------------------------------------------------------------------------------------------- 16 ------------------------------------------------------------------------------------------------------------------------------- 1 *************************************************************** 2 *************************************************************** 3 *************************************************************** 4 *************************************************************** 5 *************************************************************** 6 *************************************************************** 7 *************************************************************** 8 *************************************************************** 9 *************************************************************** 10 *************************************************************** 11 *************************************************************** 12 *************************************************************** 13 *************************************************************** 14 *************************************************************** 15 *************************************************************** 16 *************************************************************** 17 17 Control character ('\x0'): <-- 18 ------------------------------------------------------------------------------------------------------------------------------- 18 *************************************************************** -
pyyaml/trunk/tests/data/invalid-utf8-byte.stream-error
r45 r322 1 ------------------------------------------------------------------------------------------------------------------------------- 2 ------------------------------------------------------------------------------------------------------------------------------- 3 ------------------------------------------------------------------------------------------------------------------------------- 4 ------------------------------------------------------------------------------------------------------------------------------- 5 ------------------------------------------------------------------------------------------------------------------------------- 6 ------------------------------------------------------------------------------------------------------------------------------- 7 ------------------------------------------------------------------------------------------------------------------------------- 8 ------------------------------------------------------------------------------------------------------------------------------- 9 ------------------------------------------------------------------------------------------------------------------------------- 10 ------------------------------------------------------------------------------------------------------------------------------- 11 ------------------------------------------------------------------------------------------------------------------------------- 12 ------------------------------------------------------------------------------------------------------------------------------- 13 ------------------------------------------------------------------------------------------------------------------------------- 14 ------------------------------------------------------------------------------------------------------------------------------- 15 ------------------------------------------------------------------------------------------------------------------------------- 16 ------------------------------------------------------------------------------------------------------------------------------- 1 *************************************************************** 2 *************************************************************** 3 *************************************************************** 4 *************************************************************** 5 *************************************************************** 6 *************************************************************** 7 *************************************************************** 8 *************************************************************** 9 *************************************************************** 10 *************************************************************** 11 *************************************************************** 12 *************************************************************** 13 *************************************************************** 14 *************************************************************** 15 *************************************************************** 16 *************************************************************** 17 17 Invalid byte ('\xFF'): ÿ <-- 18 ------------------------------------------------------------------------------------------------------------------------------- 18 *************************************************************** -
pyyaml/trunk/tests/data/serializer-is-already-opened.dumper-error
r141 r322 1 dumper = Dumper(StringIO.StringIO())1 dumper = yaml.Dumper(StringIO.StringIO()) 2 2 dumper.open() 3 3 dumper.open() -
pyyaml/trunk/tests/data/serializer-is-closed-1.dumper-error
r141 r322 1 dumper = Dumper(StringIO.StringIO())1 dumper = yaml.Dumper(StringIO.StringIO()) 2 2 dumper.open() 3 3 dumper.close() -
pyyaml/trunk/tests/data/serializer-is-closed-2.dumper-error
r141 r322 1 dumper = Dumper(StringIO.StringIO())1 dumper = yaml.Dumper(StringIO.StringIO()) 2 2 dumper.open() 3 3 dumper.close() 4 dumper.serialize( ScalarNode(tag='!foo', value='bar'))4 dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar')) -
pyyaml/trunk/tests/data/serializer-is-not-opened-1.dumper-error
r141 r322 1 dumper = Dumper(StringIO.StringIO())1 dumper = yaml.Dumper(StringIO.StringIO()) 2 2 dumper.close() -
pyyaml/trunk/tests/data/serializer-is-not-opened-2.dumper-error
r141 r322 1 dumper = Dumper(StringIO.StringIO())2 dumper.serialize( ScalarNode(tag='!foo', value='bar'))1 dumper = yaml.Dumper(StringIO.StringIO()) 2 dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar')) -
pyyaml/trunk/tests/data/unknown.dumper-error
r141 r322 1 safe_dump(object)1 yaml.safe_dump(object) -
pyyaml/trunk/tests/test_all.py
r291 r322 1 1 2 import unittest2 import sys, yaml, test_appliance 3 3 4 def main( ):5 import yaml6 names = ['test_yaml']7 if yaml.__libyaml__:8 names.append('test_yaml_ext')9 suite = unittest.defaultTestLoader.loadTestsFromNames(names)10 runner = unittest.TextTestRunner()11 runner.run(suite)4 def main(args=None): 5 collections = [] 6 import test_yaml 7 collections.append(test_yaml) 8 if yaml.__with_libyaml__: 9 import test_yaml_ext 10 collections.append(test_yaml_ext) 11 test_appliance.run(collections, args) 12 12 13 13 if __name__ == '__main__': -
pyyaml/trunk/tests/test_appliance.py
r312 r322 1 1 2 import unittest, os2 import sys, os, os.path, types, traceback, pprint 3 3 4 from yaml import * 5 from yaml.composer import * 6 from yaml.constructor import * 7 from yaml.resolver import * 4 DATA = 'tests/data' 8 5 9 class TestAppliance(unittest.TestCase): 6 def find_test_functions(collections): 7 if not isinstance(collections, list): 8 collections = [collections] 9 functions = [] 10 for collection in collections: 11 if not isinstance(collection, dict): 12 collection = vars(collection) 13 keys = collection.keys() 14 keys.sort() 15 for key in keys: 16 value = collection[key] 17 if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 18 functions.append(value) 19 return functions 10 20 11 DATA = 'tests/data' 12 SKIP_EXT = '.skip' 21 def find_test_filenames(directory): 22 filenames = {} 23 for filename in os.listdir(directory): 24 if os.path.isfile(os.path.join(directory, filename)): 25 base, ext = os.path.splitext(filename) 26 filenames.setdefault(base, []).append(ext) 27 filenames = filenames.items() 28 filenames.sort() 29 return filenames 13 30 14 all_tests = {} 15 for filename in os.listdir(DATA): 16 if os.path.isfile(os.path.join(DATA, filename)): 17 root, ext = os.path.splitext(filename) 18 all_tests.setdefault(root, []).append(ext) 31 def parse_arguments(args): 32 if args is None: 33 args = sys.argv[1:] 34 verbose = False 35 if '-v' in args: 36 verbose = True 37 args.remove('-v') 38 if '--verbose' in args: 39 verbose = True 40 if 'YAML_TEST_VERBOSE' in os.environ: 41 verbose = True 42 include_functions = [] 43 if args: 44 include_functions.append(args.pop(0)) 45 if 'YAML_TEST_FUNCTIONS' in os.environ: 46 include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split()) 47 include_filenames = [] 48 include_filenames.extend(args) 49 if 'YAML_TEST_FILENAMES' in os.environ: 50 include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split()) 51 return include_functions, include_filenames, verbose 19 52 20 def add_tests(cls, method_name, *extensions): 21 for test in cls.all_tests: 22 available_extensions = cls.all_tests[test] 23 if cls.SKIP_EXT in available_extensions: 24 continue 25 for ext in extensions: 26 if ext not in available_extensions: 27 break 28 else: 29 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions] 30 def test_method(self, test=test, filenames=filenames): 31 getattr(self, '_'+method_name)(test, *filenames) 32 test = test.replace('-', '_').replace('.', '_') 33 try: 34 test_method.__name__ = '%s_%s' % (method_name, test) 35 except TypeError: 36 import new 37 test_method = new.function(test_method.func_code, test_method.func_globals, 38 '%s_%s' % (method_name, test), test_method.func_defaults, 39 test_method.func_closure) 40 setattr(cls, test_method.__name__, test_method) 41 add_tests = classmethod(add_tests) 53 def execute(function, filenames, verbose): 54 if verbose: 55 sys.stdout.write('='*75+'\n') 56 sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames))) 57 try: 58 function(verbose=verbose, *filenames) 59 except Exception, exc: 60 info = sys.exc_info() 61 if isinstance(exc, AssertionError): 62 kind = 'FAILURE' 63 else: 64 kind = 'ERROR' 65 if verbose: 66 traceback.print_exc(limit=1, file=sys.stdout) 67 else: 68 sys.stdout.write(kind[0]) 69 sys.stdout.flush() 70 else: 71 kind = 'SUCCESS' 72 info = None 73 if not verbose: 74 sys.stdout.write('.') 75 sys.stdout.flush() 76 return (function, filenames, kind, info) 42 77 43 class Error(Exception): 44 pass 78 def display(results, verbose): 79 if results and not verbose: 80 sys.stdout.write('\n') 81 total = len(results) 82 failures = 0 83 errors = 0 84 for function, filenames, kind, info in results: 85 if kind == 'SUCCESS': 86 continue 87 if kind == 'FAILURE': 88 failures += 1 89 if kind == 'ERROR': 90 errors += 1 91 sys.stdout.write('='*75+'\n') 92 sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind)) 93 if kind == 'ERROR': 94 traceback.print_exception(file=sys.stdout, *info) 95 else: 96 sys.stdout.write('Traceback (most recent call last):\n') 97 traceback.print_tb(info[2], file=sys.stdout) 98 sys.stdout.write('%s: see below\n' % info[0].__name__) 99 sys.stdout.write('~'*75+'\n') 100 for arg in info[1].args: 101 pprint.pprint(arg, stream=sys.stdout, indent=2) 102 for filename in filenames: 103 sys.stdout.write('-'*75+'\n') 104 sys.stdout.write('%s:\n' % filename) 105 data = open(filename, 'rb').read() 106 sys.stdout.write(data) 107 if data and data[-1] != '\n': 108 sys.stdout.write('\n') 109 sys.stdout.write('='*75+'\n') 110 sys.stdout.write('TESTS: %s\n' % total) 111 if failures: 112 sys.stdout.write('FAILURES: %s\n' % failures) 113 if errors: 114 sys.stdout.write('ERRORS: %s\n' % errors) 45 115 46 class CanonicalScanner: 116 def run(collections, args=None): 117 test_functions = find_test_functions(collections) 118 test_filenames = find_test_filenames(DATA) 119 include_functions, include_filenames, verbose = parse_arguments(args) 120 results = [] 121 for function in test_functions: 122 if include_functions and function.func_name not in include_functions: 123 continue 124 if function.unittest: 125 for base, exts in test_filenames: 126 if include_filenames and base not in include_filenames: 127 continue 128 filenames = [] 129 for ext in function.unittest: 130 if ext not in exts: 131 break 132 filenames.append(os.path.join(DATA, base+ext)) 133 else: 134 skip_exts = getattr(function, 'skip', []) 135 for skip_ext in skip_exts: 136 if skip_ext in exts: 137 break 138 else: 139 result = execute(function, filenames, verbose) 140 results.append(result) 141 else: 142 result = execute(function, [], verbose) 143 results.append(result) 144 display(results, verbose=verbose) 47 145 48 def __init__(self, data):49 self.data = unicode(data, 'utf-8')+u'\0'50 self.index = 051 self.scan()52 53 def check_token(self, *choices):54 if self.tokens:55 if not choices:56 return True57 for choice in choices:58 if isinstance(self.tokens[0], choice):59 return True60 return False61 62 def peek_token(self):63 if self.tokens:64 return self.tokens[0]65 66 def get_token(self, choice=None):67 token = self.tokens.pop(0)68 if choice and not isinstance(token, choice):69 raise Error("unexpected token "+repr(token))70 return token71 72 def get_token_value(self):73 token = self.get_token()74 return token.value75 76 def scan(self):77 self.tokens = []78 self.tokens.append(StreamStartToken(None, None))79 while True:80 self.find_token()81 ch = self.data[self.index]82 if ch == u'\0':83 self.tokens.append(StreamEndToken(None, None))84 break85 elif ch == u'%':86 self.tokens.append(self.scan_directive())87 elif ch == u'-' and self.data[self.index:self.index+3] == u'---':88 self.index += 389 self.tokens.append(DocumentStartToken(None, None))90 elif ch == u'[':91 self.index += 192 self.tokens.append(FlowSequenceStartToken(None, None))93 elif ch == u'{':94 self.index += 195 self.tokens.append(FlowMappingStartToken(None, None))96 elif ch == u']':97 self.index += 198 self.tokens.append(FlowSequenceEndToken(None, None))99 elif ch == u'}':100 self.index += 1101 self.tokens.append(FlowMappingEndToken(None, None))102 elif ch == u'?':103 self.index += 1104 self.tokens.append(KeyToken(None, None))105 elif ch == u':':106 self.index += 1107 self.tokens.append(ValueToken(None, None))108 elif ch == u',':109 self.index += 1110 self.tokens.append(FlowEntryToken(None, None))111 elif ch == u'*' or ch == u'&':112 self.tokens.append(self.scan_alias())113 elif ch == u'!':114 self.tokens.append(self.scan_tag())115 elif ch == u'"':116 self.tokens.append(self.scan_scalar())117 else:118 raise Error("invalid token")119 120 DIRECTIVE = u'%YAML 1.1'121 122 def scan_directive(self):123 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \124 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':125 self.index += len(self.DIRECTIVE)126 return DirectiveToken('YAML', (1, 1), None, None)127 128 def scan_alias(self):129 if self.data[self.index] == u'*':130 TokenClass = AliasToken131 else:132 TokenClass = AnchorToken133 self.index += 1134 start = self.index135 while self.data[self.index] not in u', \n\0':136 self.index += 1137 value = self.data[start:self.index]138 return TokenClass(value, None, None)139 140 def scan_tag(self):141 self.index += 1142 start = self.index143 while self.data[self.index] not in u' \n\0':144 self.index += 1145 value = self.data[start:self.index]146 if value[0] == u'!':147 value = 'tag:yaml.org,2002:'+value[1:]148 elif value[0] == u'<' and value[-1] == u'>':149 value = value[1:-1]150 else:151 value = u'!'+value152 return TagToken(value, None, None)153 154 QUOTE_CODES = {155 'x': 2,156 'u': 4,157 'U': 8,158 }159 160 QUOTE_REPLACES = {161 u'\\': u'\\',162 u'\"': u'\"',163 u' ': u' ',164 u'a': u'\x07',165 u'b': u'\x08',166 u'e': u'\x1B',167 u'f': u'\x0C',168 u'n': u'\x0A',169 u'r': u'\x0D',170 u't': u'\x09',171 u'v': u'\x0B',172 u'N': u'\u0085',173 u'L': u'\u2028',174 u'P': u'\u2029',175 u'_': u'_',176 u'0': u'\x00',177 178 }179 180 def scan_scalar(self):181 self.index += 1182 chunks = []183 start = self.index184 ignore_spaces = False185 while self.data[self.index] != u'"':186 if self.data[self.index] == u'\\':187 ignore_spaces = False188 chunks.append(self.data[start:self.index])189 self.index += 1190 ch = self.data[self.index]191 self.index += 1192 if ch == u'\n':193 ignore_spaces = True194 elif ch in self.QUOTE_CODES:195 length = self.QUOTE_CODES[ch]196 code = int(self.data[self.index:self.index+length], 16)197 chunks.append(unichr(code))198 self.index += length199 else:200 chunks.append(self.QUOTE_REPLACES[ch])201 start = self.index202 elif self.data[self.index] == u'\n':203 chunks.append(self.data[start:self.index])204 chunks.append(u' ')205 self.index += 1206 start = self.index207 ignore_spaces = True208 elif ignore_spaces and self.data[self.index] == u' ':209 self.index += 1210 start = self.index211 else:212 ignore_spaces = False213 self.index += 1214 chunks.append(self.data[start:self.index])215 self.index += 1216 return ScalarToken(u''.join(chunks), False, None, None)217 218 def find_token(self):219 found = False220 while not found:221 while self.data[self.index] in u' \t':222 self.index += 1223 if self.data[self.index] == u'#':224 while self.data[self.index] != u'\n':225 self.index += 1226 if self.data[self.index] == u'\n':227 self.index += 1228 else:229 found = True230 231 class CanonicalParser:232 233 def __init__(self):234 self.events = []235 self.parse()236 237 # stream: STREAM-START document* STREAM-END238 def parse_stream(self):239 self.get_token(StreamStartToken)240 self.events.append(StreamStartEvent(None, None))241 while not self.check_token(StreamEndToken):242 if self.check_token(DirectiveToken, DocumentStartToken):243 self.parse_document()244 else:245 raise Error("document is expected, got "+repr(self.tokens[self.index]))246 self.get_token(StreamEndToken)247 self.events.append(StreamEndEvent(None, None))248 249 # document: DIRECTIVE? DOCUMENT-START node250 def parse_document(self):251 node = None252 if self.check_token(DirectiveToken):253 self.get_token(DirectiveToken)254 self.get_token(DocumentStartToken)255 self.events.append(DocumentStartEvent(None, None))256 self.parse_node()257 self.events.append(DocumentEndEvent(None, None))258 259 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)260 def parse_node(self):261 if self.check_token(AliasToken):262 self.events.append(AliasEvent(self.get_token_value(), None, None))263 else:264 anchor = None265 if self.check_token(AnchorToken):266 anchor = self.get_token_value()267 tag = None268 if self.check_token(TagToken):269 tag = self.get_token_value()270 if self.check_token(ScalarToken):271 self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))272 elif self.check_token(FlowSequenceStartToken):273 self.events.append(SequenceStartEvent(anchor, tag, None, None))274 self.parse_sequence()275 elif self.check_token(FlowMappingStartToken):276 self.events.append(MappingStartEvent(anchor, tag, None, None))277 self.parse_mapping()278 else:279 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))280 281 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END282 def parse_sequence(self):283 self.get_token(FlowSequenceStartToken)284 if not self.check_token(FlowSequenceEndToken):285 self.parse_node()286 while not self.check_token(FlowSequenceEndToken):287 self.get_token(FlowEntryToken)288 if not self.check_token(FlowSequenceEndToken):289 self.parse_node()290 self.get_token(FlowSequenceEndToken)291 self.events.append(SequenceEndEvent(None, None))292 293 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END294 def parse_mapping(self):295 self.get_token(FlowMappingStartToken)296 if not self.check_token(FlowMappingEndToken):297 self.parse_map_entry()298 while not self.check_token(FlowMappingEndToken):299 self.get_token(FlowEntryToken)300 if not self.check_token(FlowMappingEndToken):301 self.parse_map_entry()302 self.get_token(FlowMappingEndToken)303 self.events.append(MappingEndEvent(None, None))304 305 # map_entry: KEY node VALUE node306 def parse_map_entry(self):307 self.get_token(KeyToken)308 self.parse_node()309 self.get_token(ValueToken)310 self.parse_node()311 312 def parse(self):313 self.parse_stream()314 315 def get_event(self):316 return self.events.pop(0)317 318 def check_event(self, *choices):319 if self.events:320 if not choices:321 return True322 for choice in choices:323 if isinstance(self.events[0], choice):324 return True325 return False326 327 def peek_event(self):328 return self.events[0]329 330 class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):331 332 def __init__(self, stream):333 if hasattr(stream, 'read'):334 stream = stream.read()335 CanonicalScanner.__init__(self, stream)336 CanonicalParser.__init__(self)337 Composer.__init__(self)338 Constructor.__init__(self)339 Resolver.__init__(self)340 341 def canonical_scan(stream):342 return scan(stream, Loader=CanonicalLoader)343 344 def canonical_parse(stream):345 return parse(stream, Loader=CanonicalLoader)346 347 def canonical_compose(stream):348 return compose(stream, Loader=CanonicalLoader)349 350 def canonical_compose_all(stream):351 return compose_all(stream, Loader=CanonicalLoader)352 353 def canonical_load(stream):354 return load(stream, Loader=CanonicalLoader)355 356 def canonical_load_all(stream):357 return load_all(stream, Loader=CanonicalLoader)358 -
pyyaml/trunk/tests/test_build.py
r311 r322 1 1 2 def main():2 if __name__ == '__main__': 3 3 import sys, os, distutils.util 4 4 build_lib = 'build/lib' … … 6 6 sys.path.insert(0, build_lib) 7 7 sys.path.insert(0, build_lib_ext) 8 import test_yaml 9 test_ yaml.main('test_yaml')8 import test_yaml, test_appliance 9 test_appliance.run(test_yaml) 10 10 11 if __name__ == '__main__':12 main()13 -
pyyaml/trunk/tests/test_build_ext.py
r195 r322 1 1 2 2 3 def main():3 if __name__ == '__main__': 4 4 import sys, os, distutils.util 5 5 build_lib = 'build/lib' … … 7 7 sys.path.insert(0, build_lib) 8 8 sys.path.insert(0, build_lib_ext) 9 import test_yaml_ext 10 test_ yaml_ext.main('test_yaml_ext')9 import test_yaml_ext, test_appliance 10 test_appliance.run(test_yaml_ext) 11 11 12 if __name__ == '__main__':13 main()14 -
pyyaml/trunk/tests/test_canonical.py
r136 r322 1 1 2 import test_appliance2 import yaml, canonical 3 3 4 class TestCanonicalAppliance(test_appliance.TestAppliance): 4 def test_canonical_scanner(canonical_filename, verbose=False): 5 data = open(canonical_filename, 'rb').read() 6 tokens = list(yaml.canonical_scan(data)) 7 assert tokens, tokens 8 if verbose: 9 for token in tokens: 10 print token 5 11 6 def _testCanonicalScanner(self, test_name, canonical_filename): 7 data = file(canonical_filename, 'rb').read() 8 tokens = list(test_appliance.canonical_scan(data)) 9 #for token in tokens: 10 # print token 12 test_canonical_scanner.unittest = ['.canonical'] 11 13 12 def _testCanonicalParser(self, test_name, canonical_filename): 13 data = file(canonical_filename, 'rb').read() 14 event = list(test_appliance.canonical_parse(data)) 15 #for event in events: 16 # print event 14 def test_canonical_parser(canonical_filename, verbose=False): 15 data = open(canonical_filename, 'rb').read() 16 events = list(yaml.canonical_parse(data)) 17 assert events, events 18 if verbose: 19 for event in events: 20 print event 17 21 18 TestCanonicalAppliance.add_tests('testCanonicalScanner', '.canonical') 19 TestCanonicalAppliance.add_tests('testCanonicalParser', '.canonical') 22 test_canonical_parser.unittest = ['.canonical'] 20 23 24 def test_canonical_error(data_filename, canonical_filename, verbose=False): 25 data = open(data_filename, 'rb').read() 26 try: 27 output = list(yaml.canonical_load_all(data)) 28 except yaml.YAMLError, exc: 29 if verbose: 30 print exc 31 else: 32 raise AssertionError("expected an exception") 33 34 test_canonical_error.unittest = ['.data', '.canonical'] 35 test_canonical_error.skip = ['.empty'] 36 37 if __name__ == '__main__': 38 import test_appliance 39 test_appliance.run(globals()) 40 -
pyyaml/trunk/tests/test_constructor.py
r234 r322 1 1 2 import test_appliance 2 import yaml 3 import pprint 3 4 4 5 import datetime … … 7 8 except NameError: 8 9 from sets import Set as set 9 10 from yaml import *11 12 10 import yaml.tokens 13 14 class MyLoader(Loader):15 pass16 class MyDumper(Dumper):17 pass18 19 class MyTestClass1:20 21 def __init__(self, x, y=0, z=0):22 self.x = x23 self.y = y24 self.z = z25 26 def __eq__(self, other):27 if isinstance(other, MyTestClass1):28 return self.__class__, self.__dict__ == other.__class__, other.__dict__29 else:30 return False31 32 def construct1(constructor, node):33 mapping = constructor.construct_mapping(node)34 return MyTestClass1(**mapping)35 def represent1(representer, native):36 return representer.represent_mapping("!tag1", native.__dict__)37 38 add_constructor("!tag1", construct1, Loader=MyLoader)39 add_representer(MyTestClass1, represent1, Dumper=MyDumper)40 41 class MyTestClass2(MyTestClass1, YAMLObject):42 43 yaml_loader = MyLoader44 yaml_dumper = MyDumper45 yaml_tag = "!tag2"46 47 def from_yaml(cls, constructor, node):48 x = constructor.construct_yaml_int(node)49 return cls(x=x)50 from_yaml = classmethod(from_yaml)51 52 def to_yaml(cls, representer, native):53 return representer.represent_scalar(cls.yaml_tag, str(native.x))54 to_yaml = classmethod(to_yaml)55 56 class MyTestClass3(MyTestClass2):57 58 yaml_tag = "!tag3"59 60 def from_yaml(cls, constructor, node):61 mapping = constructor.construct_mapping(node)62 if '=' in mapping:63 x = mapping['=']64 del mapping['=']65 mapping['x'] = x66 return cls(**mapping)67 from_yaml = classmethod(from_yaml)68 69 def to_yaml(cls, representer, native):70 return representer.represent_mapping(cls.yaml_tag, native.__dict__)71 to_yaml = classmethod(to_yaml)72 73 class YAMLObject1(YAMLObject):74 75 yaml_loader = MyLoader76 yaml_dumper = MyDumper77 yaml_tag = '!foo'78 79 def __init__(self, my_parameter=None, my_another_parameter=None):80 self.my_parameter = my_parameter81 self.my_another_parameter = my_another_parameter82 83 def __eq__(self, other):84 if isinstance(other, YAMLObject1):85 return self.__class__, self.__dict__ == other.__class__, other.__dict__86 else:87 return False88 89 class YAMLObject2(YAMLObject):90 91 yaml_loader = MyLoader92 yaml_dumper = MyDumper93 yaml_tag = '!bar'94 95 def __init__(self, foo=1, bar=2, baz=3):96 self.foo = foo97 self.bar = bar98 self.baz = baz99 100 def __getstate__(self):101 return {1: self.foo, 2: self.bar, 3: self.baz}102 103 def __setstate__(self, state):104 self.foo = state[1]105 self.bar = state[2]106 self.baz = state[3]107 108 def __eq__(self, other):109 if isinstance(other, YAMLObject2):110 return self.__class__, self.__dict__ == other.__class__, other.__dict__111 else:112 return False113 114 class AnObject(object):115 116 def __new__(cls, foo=None, bar=None, baz=None):117 self = object.__new__(cls)118 self.foo = foo119 self.bar = bar120 self.baz = baz121 return self122 123 def __cmp__(self, other):124 return cmp((type(self), self.foo, self.bar, self.baz),125 (type(other), other.foo, other.bar, other.baz))126 127 def __eq__(self, other):128 return type(self) is type(other) and \129 (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)130 131 class AnInstance:132 133 def __init__(self, foo=None, bar=None, baz=None):134 self.foo = foo135 self.bar = bar136 self.baz = baz137 138 def __cmp__(self, other):139 return cmp((type(self), self.foo, self.bar, self.baz),140 (type(other), other.foo, other.bar, other.baz))141 142 def __eq__(self, other):143 return type(self) is type(other) and \144 (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)145 146 class AState(AnInstance):147 148 def __getstate__(self):149 return {150 '_foo': self.foo,151 '_bar': self.bar,152 '_baz': self.baz,153 }154 155 def __setstate__(self, state):156 self.foo = state['_foo']157 self.bar = state['_bar']158 self.baz = state['_baz']159 160 class ACustomState(AnInstance):161 162 def __getstate__(self):163 return (self.foo, self.bar, self.baz)164 165 def __setstate__(self, state):166 self.foo, self.bar, self.baz = state167 168 class InitArgs(AnInstance):169 170 def __getinitargs__(self):171 return (self.foo, self.bar, self.baz)172 173 def __getstate__(self):174 return {}175 176 class InitArgsWithState(AnInstance):177 178 def __getinitargs__(self):179 return (self.foo, self.bar)180 181 def __getstate__(self):182 return self.baz183 184 def __setstate__(self, state):185 self.baz = state186 187 class NewArgs(AnObject):188 189 def __getnewargs__(self):190 return (self.foo, self.bar, self.baz)191 192 def __getstate__(self):193 return {}194 195 class NewArgsWithState(AnObject):196 197 def __getnewargs__(self):198 return (self.foo, self.bar)199 200 def __getstate__(self):201 return self.baz202 203 def __setstate__(self, state):204 self.baz = state205 206 class Reduce(AnObject):207 208 def __reduce__(self):209 return self.__class__, (self.foo, self.bar, self.baz)210 211 class ReduceWithState(AnObject):212 213 def __reduce__(self):214 return self.__class__, (self.foo, self.bar), self.baz215 216 def __setstate__(self, state):217 self.baz = state218 219 class MyInt(int):220 221 def __eq__(self, other):222 return type(self) is type(other) and int(self) == int(other)223 224 class MyList(list):225 226 def __init__(self, n=1):227 self.extend([None]*n)228 229 def __eq__(self, other):230 return type(self) is type(other) and list(self) == list(other)231 232 class MyDict(dict):233 234 def __init__(self, n=1):235 for k in range(n):236 self[k] = None237 238 def __eq__(self, other):239 return type(self) is type(other) and dict(self) == dict(other)240 241 class FixedOffset(datetime.tzinfo):242 243 def __init__(self, offset, name):244 self.__offset = datetime.timedelta(minutes=offset)245 self.__name = name246 247 def utcoffset(self, dt):248 return self.__offset249 250 def tzname(self, dt):251 return self.__name252 253 def dst(self, dt):254 return datetime.timedelta(0)255 256 11 257 12 def execute(code): … … 259 14 return value 260 15 261 class TestConstructorTypes(test_appliance.TestAppliance): 262 263 def _testTypes(self, test_name, data_filename, code_filename): 264 data1 = None 265 data2 = None 16 def _make_objects(): 17 global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLObject1, YAMLObject2, \ 18 AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState, \ 19 NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict, \ 20 FixedOffset, execute 21 22 class MyLoader(yaml.Loader): 23 pass 24 class MyDumper(yaml.Dumper): 25 pass 26 27 class MyTestClass1: 28 def __init__(self, x, y=0, z=0): 29 self.x = x 30 self.y = y 31 self.z = z 32 def __eq__(self, other): 33 if isinstance(other, MyTestClass1): 34 return self.__class__, self.__dict__ == other.__class__, other.__dict__ 35 else: 36 return False 37 38 def construct1(constructor, node): 39 mapping = constructor.construct_mapping(node) 40 return MyTestClass1(**mapping) 41 def represent1(representer, native): 42 return representer.represent_mapping("!tag1", native.__dict__) 43 44 yaml.add_constructor("!tag1", construct1, Loader=MyLoader) 45 yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper) 46 47 class MyTestClass2(MyTestClass1, yaml.YAMLObject): 48 yaml_loader = MyLoader 49 yaml_dumper = MyDumper 50 yaml_tag = "!tag2" 51 def from_yaml(cls, constructor, node): 52 x = constructor.construct_yaml_int(node) 53 return cls(x=x) 54 from_yaml = classmethod(from_yaml) 55 def to_yaml(cls, representer, native): 56 return representer.represent_scalar(cls.yaml_tag, str(native.x)) 57 to_yaml = classmethod(to_yaml) 58 59 class MyTestClass3(MyTestClass2): 60 yaml_tag = "!tag3" 61 def from_yaml(cls, constructor, node): 62 mapping = constructor.construct_mapping(node) 63 if '=' in mapping: 64 x = mapping['='] 65 del mapping['='] 66 mapping['x'] = x 67 return cls(**mapping) 68 from_yaml = classmethod(from_yaml) 69 def to_yaml(cls, representer, native): 70 return representer.represent_mapping(cls.yaml_tag, native.__dict__) 71 to_yaml = classmethod(to_yaml) 72 73 class YAMLObject1(yaml.YAMLObject): 74 yaml_loader = MyLoader 75 yaml_dumper = MyDumper 76 yaml_tag = '!foo' 77 def __init__(self, my_parameter=None, my_another_parameter=None): 78 self.my_parameter = my_parameter 79 self.my_another_parameter = my_another_parameter 80 def __eq__(self, other): 81 if isinstance(other, YAMLObject1): 82 return self.__class__, self.__dict__ == other.__class__, other.__dict__ 83 else: 84 return False 85 86 class YAMLObject2(yaml.YAMLObject): 87 yaml_loader = MyLoader 88 yaml_dumper = MyDumper 89 yaml_tag = '!bar' 90 def __init__(self, foo=1, bar=2, baz=3): 91 self.foo = foo 92 self.bar = bar 93 self.baz = baz 94 def __getstate__(self): 95 return {1: self.foo, 2: self.bar, 3: self.baz} 96 def __setstate__(self, state): 97 self.foo = state[1] 98 self.bar = state[2] 99 self.baz = state[3] 100 def __eq__(self, other): 101 if isinstance(other, YAMLObject2): 102 return self.__class__, self.__dict__ == other.__class__, other.__dict__ 103 else: 104 return False 105 106 class AnObject(object): 107 def __new__(cls, foo=None, bar=None, baz=None): 108 self = object.__new__(cls) 109 self.foo = foo 110 self.bar = bar 111 self.baz = baz 112 return self 113 def __cmp__(self, other): 114 return cmp((type(self), self.foo, self.bar, self.baz), 115 (type(other), other.foo, other.bar, other.baz)) 116 def __eq__(self, other): 117 return type(self) is type(other) and \ 118 (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz) 119 120 class AnInstance: 121 def __init__(self, foo=None, bar=None, baz=None): 122 self.foo = foo 123 self.bar = bar 124 self.baz = baz 125 def __cmp__(self, other): 126 return cmp((type(self), self.foo, self.bar, self.baz), 127 (type(other), other.foo, other.bar, other.baz)) 128 def __eq__(self, other): 129 return type(self) is type(other) and \ 130 (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz) 131 132 class AState(AnInstance): 133 def __getstate__(self): 134 return { 135 '_foo': self.foo, 136 '_bar': self.bar, 137 '_baz': self.baz, 138 } 139 def __setstate__(self, state): 140 self.foo = state['_foo'] 141 self.bar = state['_bar'] 142 self.baz = state['_baz'] 143 144 class ACustomState(AnInstance): 145 def __getstate__(self): 146 return (self.foo, self.bar, self.baz) 147 def __setstate__(self, state): 148 self.foo, self.bar, self.baz = state 149 150 class InitArgs(AnInstance): 151 def __getinitargs__(self): 152 return (self.foo, self.bar, self.baz) 153 def __getstate__(self): 154 return {} 155 156 class InitArgsWithState(AnInstance): 157 def __getinitargs__(self): 158 return (self.foo, self.bar) 159 def __getstate__(self): 160 return self.baz 161 def __setstate__(self, state): 162 self.baz = state 163 164 class NewArgs(AnObject): 165 def __getnewargs__(self): 166 return (self.foo, self.bar, self.baz) 167 def __getstate__(self): 168 return {} 169 170 class NewArgsWithState(AnObject): 171 def __getnewargs__(self): 172 return (self.foo, self.bar) 173 def __getstate__(self): 174 return self.baz 175 def __setstate__(self, state): 176 self.baz = state 177 178 class Reduce(AnObject): 179 def __reduce__(self): 180 return self.__class__, (self.foo, self.bar, self.baz) 181 182 class ReduceWithState(AnObject): 183 def __reduce__(self): 184 return self.__class__, (self.foo, self.bar), self.baz 185 def __setstate__(self, state): 186 self.baz = state 187 188 class MyInt(int): 189 def __eq__(self, other): 190 return type(self) is type(other) and int(self) == int(other) 191 192 class MyList(list): 193 def __init__(self, n=1): 194 self.extend([None]*n) 195 def __eq__(self, other): 196 return type(self) is type(other) and list(self) == list(other) 197 198 class MyDict(dict): 199 def __init__(self, n=1): 200 for k in range(n): 201 self[k] = None 202 def __eq__(self, other): 203 return type(self) is type(other) and dict(self) == dict(other) 204 205 class FixedOffset(datetime.tzinfo): 206 def __init__(self, offset, name): 207 self.__offset = datetime.timedelta(minutes=offset) 208 self.__name = name 209 def utcoffset(self, dt): 210 return self.__offset 211 def tzname(self, dt): 212 return self.__name 213 def dst(self, dt): 214 return datetime.timedelta(0) 215 216 def _load_code(expression): 217 return eval(expression) 218 219 def _serialize_value(data): 220 if isinstance(data, list): 221 return '[%s]' % ', '.join(map(_serialize_value, data)) 222 elif isinstance(data, dict): 223 items = [] 224 for key, value in data.items(): 225 key = _serialize_value(key) 226 value = _serialize_value(value) 227 items.append("%s: %s" % (key, value)) 228 items.sort() 229 return '{%s}' % ', '.join(items) 230 elif isinstance(data, datetime.datetime): 231 return repr(data.utctimetuple()) 232 elif isinstance(data, unicode): 233 return data.encode('utf-8') 234 else: 235 return str(data) 236 237 def test_constructor_types(data_filename, code_filename, verbose=False): 238 _make_objects() 239 native1 = None 240 native2 = None 241 try: 242 native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) 243 if len(native1) == 1: 244 native1 = native1[0] 245 native2 = _load_code(open(code_filename, 'rb').read()) 266 246 try: 267 data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader)) 268 if len(data1) == 1: 269 data1 = data1[0] 270 data2 = eval(file(code_filename, 'rb').read()) 271 self.failUnlessEqual(type(data1), type(data2)) 272 try: 273 self.failUnlessEqual(data1, data2) 274 except (AssertionError, TypeError): 275 if isinstance(data1, dict): 276 data1 = [(repr(key), value) for key, value in data1.items()] 277 data1.sort() 278 data1 = repr(data1) 279 data2 = [(repr(key), value) for key, value in data2.items()] 280 data2.sort() 281 data2 = repr(data2) 282 if data1 != data2: 283 raise 284 elif isinstance(data1, list): 285 self.failUnlessEqual(type(data1), type(data2)) 286 self.failUnlessEqual(len(data1), len(data2)) 287 for item1, item2 in zip(data1, data2): 288 if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \ 289 (item2 != item2 or (item2 == 0.0 and item2 == 1.0)): 290 continue 291 if isinstance(item1, datetime.datetime) \ 292 and isinstance(item2, datetime.datetime): 293 self.failUnlessEqual(item1.microsecond, 294 item2.microsecond) 295 if isinstance(item1, datetime.datetime): 296 item1 = item1.utctimetuple() 297 if isinstance(item2, datetime.datetime): 298 item2 = item2.utctimetuple() 299 self.failUnlessEqual(item1, item2) 300 else: 301 raise 302 except: 303 print 304 print "DATA:" 305 print file(data_filename, 'rb').read() 306 print "CODE:" 307 print file(code_filename, 'rb').read() 308 print "NATIVES1:", data1 309 print "NATIVES2:", data2 310 raise 311 312 TestConstructorTypes.add_tests('testTypes', '.data', '.code') 313 247 if native1 == native2: 248 return 249 except TypeError: 250 pass 251 if verbose: 252 print "SERIALIZED NATIVE1:" 253 print _serialize_value(native1) 254 print "SERIALIZED NATIVE2:" 255 print _serialize_value(native2) 256 assert _serialize_value(native1) == _serialize_value(native2), (native1, native2) 257 finally: 258 if verbose: 259 print "NATIVE1:" 260 pprint.pprint(native1) 261 print "NATIVE2:" 262 pprint.pprint(native2) 263 264 test_constructor_types.unittest = ['.data', '.code'] 265 266 if __name__ == '__main__': 267 import sys, test_constructor 268 sys.modules['test_constructor'] = sys.modules['__main__'] 269 import test_appliance 270 test_appliance.run(globals()) 271 -
pyyaml/trunk/tests/test_emitter.py
r312 r322 1 1 2 import test_appliance, sys, StringIO3 4 from yaml import *5 2 import yaml 6 3 7 class TestEmitter(test_appliance.TestAppliance): 4 def _compare_events(events1, events2): 5 assert len(events1) == len(events2), (events1, events2) 6 for event1, event2 in zip(events1, events2): 7 assert event1.__class__ == event2.__class__, (event1, event2) 8 if isinstance(event1, yaml.NodeEvent): 9 assert event1.anchor == event2.anchor, (event1, event2) 10 if isinstance(event1, yaml.CollectionStartEvent): 11 assert event1.tag == event2.tag, (event1, event2) 12 if isinstance(event1, yaml.ScalarEvent): 13 if True not in event1.implicit+event2.implicit: 14 assert event1.tag == event2.tag, (event1, event2) 15 assert event1.value == event2.value, (event1, event2) 8 16 9 def _testEmitterOnData(self, test_name, canonical_filename, data_filename): 10 self._testEmitter(test_name, data_filename) 17 def test_emitter_on_data(data_filename, canonical_filename, verbose=False): 18 events = list(yaml.parse(open(data_filename, 'rb'))) 19 output = yaml.emit(events) 20 if verbose: 21 print "OUTPUT:" 22 print output 23 new_events = list(yaml.parse(output)) 24 _compare_events(events, new_events) 11 25 12 def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename): 13 self._testEmitter(test_name, canonical_filename, False) 26 test_emitter_on_data.unittest = ['.data', '.canonical'] 14 27 15 def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename): 16 self._testEmitter(test_name, canonical_filename, True) 28 def test_emitter_on_canonical(canonical_filename, verbose=False): 29 events = list(yaml.parse(open(canonical_filename, 'rb'))) 30 for canonical in [False, True]: 31 output = yaml.emit(events, canonical=canonical) 32 if verbose: 33 print "OUTPUT (canonical=%s):" % canonical 34 print output 35 new_events = list(yaml.parse(output)) 36 _compare_events(events, new_events) 17 37 18 def _testEmitter(self, test_name, filename, canonical=None): 19 events = list(parse(file(filename, 'rb'))) 20 #self._dump(filename, events, canonical) 21 stream = StringIO.StringIO() 22 emit(events, stream, canonical=canonical) 23 data = stream.getvalue() 24 new_events = list(parse(data)) 25 for event, new_event in zip(events, new_events): 26 self.failUnlessEqual(event.__class__, new_event.__class__) 27 if isinstance(event, NodeEvent): 28 self.failUnlessEqual(event.anchor, new_event.anchor) 29 if isinstance(event, CollectionStartEvent): 30 self.failUnlessEqual(event.tag, new_event.tag) 31 if isinstance(event, ScalarEvent): 32 #self.failUnlessEqual(event.implicit, new_event.implicit) 33 if True not in event.implicit+new_event.implicit: 34 self.failUnlessEqual(event.tag, new_event.tag) 35 self.failUnlessEqual(event.value, new_event.value) 38 test_emitter_on_canonical.unittest = ['.canonical'] 36 39 37 def _testEmitterStyles(self, test_name, canonical_filename, data_filename): 38 for filename in [canonical_filename, data_filename]: 39 events = list(parse(file(filename, 'rb'))) 40 for flow_style in [False, True]: 41 for style in ['|', '>', '"', '\'', '']: 42 styled_events = [] 43 for event in events: 44 if isinstance(event, ScalarEvent): 45 event = ScalarEvent(event.anchor, event.tag, 46 event.implicit, event.value, style=style) 47 elif isinstance(event, SequenceStartEvent): 48 event = SequenceStartEvent(event.anchor, event.tag, 49 event.implicit, flow_style=flow_style) 50 elif isinstance(event, MappingStartEvent): 51 event = MappingStartEvent(event.anchor, event.tag, 52 event.implicit, flow_style=flow_style) 53 styled_events.append(event) 54 stream = StringIO.StringIO() 55 emit(styled_events, stream) 56 data = stream.getvalue() 57 #print data 58 new_events = list(parse(data)) 59 for event, new_event in zip(events, new_events): 60 self.failUnlessEqual(event.__class__, new_event.__class__) 61 if isinstance(event, NodeEvent): 62 self.failUnlessEqual(event.anchor, new_event.anchor) 63 if isinstance(event, CollectionStartEvent): 64 self.failUnlessEqual(event.tag, new_event.tag) 65 if isinstance(event, ScalarEvent): 66 #self.failUnlessEqual(event.implicit, new_event.implicit) 67 if True not in event.implicit+new_event.implicit: 68 self.failUnlessEqual(event.tag, new_event.tag) 69 self.failUnlessEqual(event.value, new_event.value) 40 def test_emitter_styles(data_filename, canonical_filename, verbose=False): 41 for filename in [data_filename, canonical_filename]: 42 events = list(yaml.parse(open(filename, 'rb'))) 43 for flow_style in [False, True]: 44 for style in ['|', '>', '"', '\'', '']: 45 styled_events = [] 46 for event in events: 47 if isinstance(event, yaml.ScalarEvent): 48 event = yaml.ScalarEvent(event.anchor, event.tag, 49 event.implicit, event.value, style=style) 50 elif isinstance(event, yaml.SequenceStartEvent): 51 event = yaml.SequenceStartEvent(event.anchor, event.tag, 52 event.implicit, flow_style=flow_style) 53 elif isinstance(event, yaml.MappingStartEvent): 54 event = yaml.MappingStartEvent(event.anchor, event.tag, 55 event.implicit, flow_style=flow_style) 56 styled_events.append(event) 57 output = yaml.emit(styled_events) 58 if verbose: 59 print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style) 60 print output 61 new_events = list(yaml.parse(output)) 62 _compare_events(events, new_events) 70 63 64 test_emitter_styles.unittest = ['.data', '.canonical'] 71 65 72 def _dump(self, filename, events, canonical): 73 print "="*30 74 print "ORIGINAL DOCUMENT:" 75 print file(filename, 'rb').read() 76 print '-'*30 77 print "EMITTED DOCUMENT:" 78 emit(events, sys.stdout, canonical=canonical) 79 80 TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data') 81 TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical') 82 TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical') 83 TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data') 84 85 class EventsLoader(Loader): 66 class EventsLoader(yaml.Loader): 86 67 87 68 def construct_event(self, node): 88 if isinstance(node, ScalarNode):69 if isinstance(node, yaml.ScalarNode): 89 70 mapping = {} 90 71 else: … … 105 86 EventsLoader.add_constructor(None, EventsLoader.construct_event) 106 87 107 class TestEmitterEvents(test_appliance.TestAppliance): 88 def test_emitter_events(events_filename, verbose=False): 89 events = list(yaml.load(open(events_filename, 'rb'), Loader=EventsLoader)) 90 output = yaml.emit(events) 91 if verbose: 92 print "OUTPUT:" 93 print output 94 new_events = list(yaml.parse(output)) 95 _compare_events(events, new_events) 108 96 109 def _testEmitterEvents(self, test_name, events_filename): 110 events = list(load(file(events_filename, 'rb'), Loader=EventsLoader)) 111 #self._dump(events_filename, events) 112 stream = StringIO.StringIO() 113 emit(events, stream) 114 data = stream.getvalue() 115 new_events = list(parse(data)) 116 self.failUnlessEqual(len(events), len(new_events)) 117 for event, new_event in zip(events, new_events): 118 self.failUnlessEqual(event.__class__, new_event.__class__) 119 if isinstance(event, NodeEvent): 120 self.failUnlessEqual(event.anchor, new_event.anchor) 121 if isinstance(event, CollectionStartEvent): 122 self.failUnlessEqual(event.tag, new_event.tag) 123 if isinstance(event, ScalarEvent): 124 self.failUnless(event.implicit == new_event.implicit 125 or event.tag == new_event.tag) 126 self.failUnlessEqual(event.value, new_event.value) 97 if __name__ == '__main__': 98 import test_appliance 99 test_appliance.run(globals()) 127 100 128 def _dump(self, events_filename, events):129 print "="*30130 print "EVENTS:"131 print file(events_filename, 'rb').read()132 print '-'*30133 print "OUTPUT:"134 emit(events, sys.stdout)135 136 TestEmitterEvents.add_tests('testEmitterEvents', '.events')137 -
pyyaml/trunk/tests/test_errors.py
r260 r322 1 1 2 import test_appliance 3 import test_emitter 2 import yaml, test_emitter 4 3 5 import StringIO 4 def test_loader_error(error_filename, verbose=False): 5 try: 6 list(yaml.load_all(open(error_filename, 'rb'))) 7 except yaml.YAMLError, exc: 8 if verbose: 9 print "%s:" % exc.__class__.__name__, exc 10 else: 11 raise AssertionError("expected an exception") 6 12 7 from yaml import * 13 test_loader_error.unittest = ['.loader-error'] 8 14 9 class TestErrors(test_appliance.TestAppliance): 15 def test_loader_error_string(error_filename, verbose=False): 16 try: 17 list(yaml.load_all(open(error_filename, 'rb').read())) 18 except yaml.YAMLError, exc: 19 if verbose: 20 print "%s:" % exc.__class__.__name__, exc 21 else: 22 raise AssertionError("expected an exception") 10 23 11 def _testLoaderErrors(self, test_name, invalid_filename): 12 #self._load(invalid_filename) 13 self.failUnlessRaises(YAMLError, lambda: self._load(invalid_filename)) 24 test_loader_error_string.unittest = ['.loader-error'] 14 25 15 def _testLoaderStringErrors(self, test_name, invalid_filename): 16 #self._load_string(invalid_filename) 17 self.failUnlessRaises(YAMLError, lambda: self._load_string(invalid_filename)) 26 def test_loader_error_single(error_filename, verbose=False): 27 try: 28 yaml.load(open(error_filename, 'rb').read()) 29 except yaml.YAMLError, exc: 30 if verbose: 31 print "%s:" % exc.__class__.__name__, exc 32 else: 33 raise AssertionError("expected an exception") 18 34 19 def _testLoaderSingleErrors(self, test_name, invalid_filename): 20 #self._load_single(invalid_filename) 21 self.failUnlessRaises(YAMLError, lambda: self._load_single(invalid_filename)) 35 test_loader_error_single.unittest = ['.single-loader-error'] 22 36 23 def _testEmitterErrors(self, test_name, invalid_filename): 24 events = list(load(file(invalid_filename, 'rb').read(), 25 Loader=test_emitter.EventsLoader)) 26 self.failUnlessRaises(YAMLError, lambda: self._emit(events)) 37 def test_emitter_error(error_filename, verbose=False): 38 events = list(yaml.load(open(error_filename, 'rb'), 39 Loader=test_emitter.EventsLoader)) 40 try: 41 yaml.emit(events) 42 except yaml.YAMLError, exc: 43 if verbose: 44 print "%s:" % exc.__class__.__name__, exc 45 else: 46 raise AssertionError("expected an exception") 27 47 28 def _testDumperErrors(self, test_name, invalid_filename): 29 code = file(invalid_filename, 'rb').read() 30 self.failUnlessRaises(YAMLError, lambda: self._dump(code)) 48 test_emitter_error.unittest = ['.emitter-error'] 31 49 32 def _dump(self, code): 33 try: 34 exec code 35 except YAMLError, exc: 36 #print '.'*70 37 #print "%s:" % exc.__class__.__name__, exc 38 raise 50 def test_dumper_error(error_filename, verbose=False): 51 code = open(error_filename, 'rb').read() 52 try: 53 import yaml, StringIO 54 exec code 55 except yaml.YAMLError, exc: 56 if verbose: 57 print "%s:" % exc.__class__.__name__, exc 58 else: 59 raise AssertionError("expected an exception") 39 60 40 def _emit(self, events): 41 try: 42 emit(events) 43 except YAMLError, exc: 44 #print '.'*70 45 #print "%s:" % exc.__class__.__name__, exc 46 raise 61 test_dumper_error.unittest = ['.dumper-error'] 47 62 48 def _load(self, filename): 49 try: 50 return list(load_all(file(filename, 'rb'))) 51 except YAMLError, exc: 52 #except ScannerError, exc: 53 #except ParserError, exc: 54 #except ComposerError, exc: 55 #except ConstructorError, exc: 56 #print '.'*70 57 #print "%s:" % exc.__class__.__name__, exc 58 raise 63 if __name__ == '__main__': 64 import test_appliance 65 test_appliance.run(globals()) 59 66 60 def _load_string(self, filename):61 try:62 return list(load_all(file(filename, 'rb').read()))63 except YAMLError, exc:64 #except ScannerError, exc:65 #except ParserError, exc:66 #except ComposerError, exc:67 #except ConstructorError, exc:68 #print '.'*7069 #print "%s:" % filename70 #print "%s:" % exc.__class__.__name__, exc71 raise72 73 def _load_single(self, filename):74 try:75 return load(file(filename, 'rb').read())76 except YAMLError, exc:77 #except ScannerError, exc:78 #except ParserError, exc:79 #except ComposerError, exc:80 #except ConstructorError, exc:81 #print '.'*7082 #print "%s:" % filename83 #print "%s:" % exc.__class__.__name__, exc84 raise85 86 TestErrors.add_tests('testLoaderErrors', '.loader-error')87 TestErrors.add_tests('testLoaderStringErrors', '.loader-error')88 TestErrors.add_tests('testLoaderSingleErrors', '.single-loader-error')89 TestErrors.add_tests('testEmitterErrors', '.emitter-error')90 TestErrors.add_tests('testDumperErrors', '.dumper-error')91 -
pyyaml/trunk/tests/test_mark.py
r120 r322 1 1 2 import test_appliance2 import yaml 3 3 4 from yaml.reader import Mark 4 def test_marks(marks_filename, verbose=False): 5 inputs = open(marks_filename, 'rb').read().split('---\n')[1:] 6 for input in inputs: 7 index = 0 8 line = 0 9 column = 0 10 while input[index] != '*': 11 if input[index] == '\n': 12 line += 1 13 column = 0 14 else: 15 column += 1 16 index += 1 17 mark = yaml.Mark(marks_filename, index, line, column, unicode(input), index) 18 snippet = mark.get_snippet(indent=2, max_length=79) 19 if verbose: 20 print snippet 21 assert isinstance(snippet, str), type(snippet) 22 assert snippet.count('\n') == 1, snippet.count('\n') 23 data, pointer = snippet.split('\n') 24 assert len(data) < 82, len(data) 25 assert data[len(pointer)-1] == '*', data[len(pointer)-1] 5 26 6 class TestMark(test_appliance.TestAppliance): 27 test_marks.unittest = ['.marks'] 7 28 8 def _testMarks(self, test_name, marks_filename): 9 inputs = file(marks_filename, 'rb').read().split('---\n')[1:] 10 for input in inputs: 11 index = 0 12 line = 0 13 column = 0 14 while input[index] != '*': 15 if input[index] == '\n': 16 line += 1 17 column = 0 18 else: 19 column += 1 20 index += 1 21 mark = Mark(test_name, index, line, column, unicode(input), index) 22 snippet = mark.get_snippet(indent=2, max_length=79) 23 #print "INPUT:" 24 #print input 25 #print "SNIPPET:" 26 #print snippet 27 self.failUnless(isinstance(snippet, str)) 28 self.failUnlessEqual(snippet.count('\n'), 1) 29 data, pointer = snippet.split('\n') 30 self.failUnless(len(data) < 82) 31 self.failUnlessEqual(data[len(pointer)-1], '*') 29 if __name__ == '__main__': 30 import test_appliance 31 test_appliance.run(globals()) 32 32 33 TestMark.add_tests('testMarks', '.marks')34 -
pyyaml/trunk/tests/test_reader.py
r46 r322 1 1 2 import test_appliance 3 from yaml.reader import Reader, ReaderError 4 2 import yaml.reader 5 3 import codecs 6 4 7 class TestReaderErrors(test_appliance.TestAppliance): 8 9 def _testReaderUnicodeErrors(self, test_name, stream_filename): 10 for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']: 11 try: 12 data = unicode(file(stream_filename, 'rb').read(), encoding) 13 break 14 except: 15 pass 16 else: 17 return 18 #self._load(data) 19 self.failUnlessRaises(ReaderError, 20 lambda: self._load(data)) 21 #self._load(codecs.open(stream_filename, encoding=encoding)) 22 self.failUnlessRaises(ReaderError, 23 lambda: self._load(codecs.open(stream_filename, encoding=encoding))) 24 25 def _testReaderStringErrors(self, test_name, stream_filename): 26 data = file(stream_filename, 'rb').read() 27 #self._load(data) 28 self.failUnlessRaises(ReaderError, lambda: self._load(data)) 29 30 def _testReaderFileErrors(self, test_name, stream_filename): 31 data = file(stream_filename, 'rb') 32 #self._load(data) 33 self.failUnlessRaises(ReaderError, lambda: self._load(data)) 34 35 def _load(self, data): 36 stream = Reader(data) 5 def _run_reader(data, verbose): 6 try: 7 stream = yaml.reader.Reader(data) 37 8 while stream.peek() != u'\0': 38 9 stream.forward() 10 except yaml.reader.ReaderError, exc: 11 if verbose: 12 print exc 13 else: 14 raise AssertionError("expected an exception") 39 15 40 TestReaderErrors.add_tests('testReaderUnicodeErrors', '.stream-error') 41 TestReaderErrors.add_tests('testReaderStringErrors', '.stream-error') 42 TestReaderErrors.add_tests('testReaderFileErrors', '.stream-error') 16 def test_stream_error(error_filename, verbose=False): 17 _run_reader(open(error_filename, 'rb'), verbose) 18 _run_reader(open(error_filename, 'rb').read(), verbose) 19 for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']: 20 try: 21 data = unicode(open(error_filename, 'rb').read(), encoding) 22 break 23 except UnicodeDecodeError: 24 pass 25 else: 26 return 27 _run_reader(data, verbose) 28 _run_reader(codecs.open(error_filename, encoding=encoding), verbose) 43 29 30 test_stream_error.unittest = ['.stream-error'] 44 31 32 if __name__ == '__main__': 33 import test_appliance 34 test_appliance.run(globals()) 35 -
pyyaml/trunk/tests/test_recursive.py
r222 r322 1 1 2 import test_appliance 3 4 from yaml import * 2 import yaml 5 3 6 4 class AnInstance: … … 25 23 self.foo, self.bar = state['attributes'] 26 24 27 class TestRecursive(test_appliance.TestAppliance): 28 29 def _testRecursive(self, test_name, recursive_filename): 30 exec file(recursive_filename, 'r').read() 31 value1 = value 32 output1 = None 33 value2 = None 34 output2 = None 35 try: 36 output1 = dump(value1) 37 #print "OUTPUT %s:" % test_name 38 #print output1 39 value2 = load(output1) 40 output2 = dump(value2) 41 self.failUnlessEqual(output1, output2) 42 except: 25 def test_recursive(recursive_filename, verbose=False): 26 exec open(recursive_filename, 'rb').read() 27 value1 = value 28 output1 = None 29 value2 = None 30 output2 = None 31 try: 32 output1 = yaml.dump(value1) 33 value2 = yaml.load(output1) 34 output2 = yaml.dump(value2) 35 assert output1 == output2, (output1, output2) 36 finally: 37 if verbose: 43 38 print "VALUE1:", value1 44 39 print "VALUE2:", value2 … … 47 42 print "OUTPUT2:" 48 43 print output2 49 raise50 44 51 TestRecursive.add_tests('testRecursive', '.recursive') 45 test_recursive.unittest = ['.recursive'] 52 46 47 if __name__ == '__main__': 48 import test_appliance 49 test_appliance.run(globals()) 50 -
pyyaml/trunk/tests/test_representer.py
r234 r322 1 1 2 import test_appliance 3 from test_constructor import * 2 import yaml 3 import test_constructor 4 import pprint 4 5 5 from yaml import * 6 def test_representer_types(code_filename, verbose=False): 7 test_constructor._make_objects() 8 for allow_unicode in [False, True]: 9 native1 = test_constructor._load_code(open(code_filename, 'rb').read()) 10 native2 = None 11 try: 12 output = yaml.dump(native1, Dumper=test_constructor.MyDumper, 13 allow_unicode=allow_unicode) 14 native2 = yaml.load(output, Loader=test_constructor.MyLoader) 15 try: 16 if native1 == native2: 17 continue 18 except TypeError: 19 pass 20 value1 = test_constructor._serialize_value(native1) 21 value2 = test_constructor._serialize_value(native2) 22 if verbose: 23 print "SERIALIZED NATIVE1:" 24 print value1 25 print "SERIALIZED NATIVE2:" 26 print value2 27 assert value1 == value2, (native1, native2) 28 finally: 29 if verbose: 30 print "NATIVE1:" 31 pprint.pprint(native1) 32 print "NATIVE2:" 33 pprint.pprint(native2) 34 print "OUTPUT:" 35 print output 6 36 7 class TestRepresenterTypes(test_appliance.TestAppliance): 37 test_representer_types.unittest = ['.code'] 8 38 9 def _testTypesUnicode(self, test_name, data_filename, code_filename): 10 return self._testTypes(test_name, data_filename, code_filename, allow_unicode=True) 39 if __name__ == '__main__': 40 import test_appliance 41 test_appliance.run(globals()) 11 42 12 def _testTypes(self, test_name, data_filename, code_filename, allow_unicode=False):13 data1 = eval(file(code_filename, 'rb').read())14 data2 = None15 output = None16 try:17 output = dump(data1, Dumper=MyDumper, allow_unicode=allow_unicode)18 data2 = load(output, Loader=MyLoader)19 self.failUnlessEqual(type(data1), type(data2))20 try:21 self.failUnlessEqual(data1, data2)22 except (AssertionError, TypeError):23 if isinstance(data1, dict):24 data1 = [(repr(key), value) for key, value in data1.items()]25 data1.sort()26 data1 = repr(data1)27 data2 = [(repr(key), value) for key, value in data2.items()]28 data2.sort()29 data2 = repr(data2)30 if data1 != data2:31 raise32 elif isinstance(data1, list):33 self.failUnlessEqual(type(data1), type(data2))34 self.failUnlessEqual(len(data1), len(data2))35 for item1, item2 in zip(data1, data2):36 if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \37 (item2 != item2 or (item2 == 0.0 and item2 == 1.0)):38 continue39 if isinstance(item1, datetime.datetime) \40 and isinstance(item2, datetime.datetime):41 self.failUnlessEqual(item1.microsecond,42 item2.microsecond)43 if isinstance(item1, datetime.datetime):44 item1 = item1.utctimetuple()45 if isinstance(item2, datetime.datetime):46 item2 = item2.utctimetuple()47 self.failUnlessEqual(item1, item2)48 else:49 raise50 except:51 print52 print "OUTPUT:"53 print output54 print "NATIVES1:", data155 print "NATIVES2:", data256 raise57 58 TestRepresenterTypes.add_tests('testTypes', '.data', '.code')59 TestRepresenterTypes.add_tests('testTypesUnicode', '.data', '.code')60 -
pyyaml/trunk/tests/test_resolver.py
r222 r322 1 1 2 import test_appliance 2 import yaml 3 import pprint 3 4 4 from yaml import * 5 def test_implicit_resolver(data_filename, detect_filename, verbose=False): 6 correct_tag = None 7 node = None 8 try: 9 correct_tag = open(detect_filename, 'rb').read().strip() 10 node = yaml.compose(open(data_filename, 'rb')) 11 assert isinstance(node, yaml.SequenceNode), node 12 for scalar in node.value: 13 assert isinstance(scalar, yaml.ScalarNode), scalar 14 assert scalar.tag == correct_tag, (scalar.tag, correct_tag) 15 finally: 16 if verbose: 17 print "CORRECT TAG:", correct_tag 18 if hasattr(node, 'value'): 19 print "CHILDREN:" 20 pprint.pprint(node.value) 5 21 6 class MyLoader(Loader): 7 pass 22 test_implicit_resolver.unittest = ['.data', '.detect'] 8 23 9 class MyDumper(Dumper):10 pass24 def _make_path_loader_and_dumper(): 25 global MyLoader, MyDumper 11 26 12 add_path_resolver(u'!root', [], 13 Loader=MyLoader, Dumper=MyDumper) 27 class MyLoader(yaml.Loader): 28 pass 29 class MyDumper(yaml.Dumper): 30 pass 14 31 15 add_path_resolver(u'!root/scalar', [], str, 16 Loader=MyLoader, Dumper=MyDumper) 32 yaml.add_path_resolver(u'!root', [], 33 Loader=MyLoader, Dumper=MyDumper) 34 yaml.add_path_resolver(u'!root/scalar', [], str, 35 Loader=MyLoader, Dumper=MyDumper) 36 yaml.add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'], 37 Loader=MyLoader, Dumper=MyDumper) 38 yaml.add_path_resolver(u'!root/key21/1/*', ['key21', 1], 39 Loader=MyLoader, Dumper=MyDumper) 40 yaml.add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict, 41 Loader=MyLoader, Dumper=MyDumper) 17 42 18 add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'], 19 Loader=MyLoader, Dumper=MyDumper) 43 return MyLoader, MyDumper 20 44 21 add_path_resolver(u'!root/key21/1/*', ['key21', 1], 22 Loader=MyLoader, Dumper=MyDumper) 45 def _convert_node(node): 46 if isinstance(node, yaml.ScalarNode): 47 return (node.tag, node.value) 48 elif isinstance(node, yaml.SequenceNode): 49 value = [] 50 for item in node.value: 51 value.append(_convert_node(item)) 52 return (node.tag, value) 53 elif isinstance(node, yaml.MappingNode): 54 value = [] 55 for key, item in node.value: 56 value.append((_convert_node(key), _convert_node(item))) 57 return (node.tag, value) 23 58 24 add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict, 25 Loader=MyLoader, Dumper=MyDumper) 59 def test_path_resolver_loader(data_filename, path_filename, verbose=False): 60 _make_path_loader_and_dumper() 61 nodes1 = list(yaml.compose_all(open(data_filename, 'rb').read(), Loader=MyLoader)) 62 nodes2 = list(yaml.compose_all(open(path_filename, 'rb').read())) 63 try: 64 for node1, node2 in zip(nodes1, nodes2): 65 data1 = _convert_node(node1) 66 data2 = _convert_node(node2) 67 assert data1 == data2, (data1, data2) 68 finally: 69 if verbose: 70 print yaml.serialize_all(nodes1) 26 71 27 class TestResolver(test_appliance.TestAppliance): 72 test_path_resolver_loader.unittest = ['.data', '.path'] 28 73 29 def _testImplicitResolver(self, test_name, data_filename, detect_filename): 30 node = None 31 correct_tag = None 32 try: 33 correct_tag = file(detect_filename, 'rb').read().strip() 34 node = compose(file(data_filename, 'rb')) 35 self.failUnless(isinstance(node, SequenceNode)) 36 for scalar in node.value: 37 self.failUnless(isinstance(scalar, ScalarNode)) 38 self.failUnlessEqual(scalar.tag, correct_tag) 39 except: 40 print 41 print "DATA:" 42 print file(data_filename, 'rb').read() 43 print "CORRECT_TAG:" 44 print file(detect_filename, 'rb').read() 45 print "ROOT NODE:", node 46 print "SCALAR NODES:", node.value 47 raise 74 def test_path_resolver_dumper(data_filename, path_filename, verbose=False): 75 _make_path_loader_and_dumper() 76 for filename in [data_filename, path_filename]: 77 output = yaml.serialize_all(yaml.compose_all(open(filename, 'rb')), Dumper=MyDumper) 78 if verbose: 79 print output 80 nodes1 = yaml.compose_all(output) 81 nodes2 = yaml.compose_all(open(data_filename, 'rb')) 82 for node1, node2 in zip(nodes1, nodes2): 83 data1 = _convert_node(node1) 84 data2 = _convert_node(node2) 85 assert data1 == data2, (data1, data2) 48 86 49 def _testPathResolverLoader(self, test_name, data_filename, path_filename): 50 #print serialize_all(compose_all(file(data_filename, 'rb').read(), Loader=MyLoader)) 51 nodes1 = compose_all(file(data_filename, 'rb').read(), Loader=MyLoader) 52 nodes2 = compose_all(file(path_filename, 'rb').read()) 53 for node1, node2 in zip(nodes1, nodes2): 54 self.failUnlessEqual(self._convert(node1), self._convert(node2)) 87 test_path_resolver_dumper.unittest = ['.data', '.path'] 55 88 56 def _testPathResolverDumper(self, test_name, data_filename, path_filename): 57 for filename in [data_filename, path_filename]: 58 output = serialize_all(compose_all(file(filename, 'rb').read()), Dumper=MyDumper) 59 #print output 60 nodes1 = compose_all(output) 61 nodes2 = compose_all(file(data_filename, 'rb').read()) 62 for node1, node2 in zip(nodes1, nodes2): 63 self.failUnlessEqual(self._convert(node1), self._convert(node2)) 89 if __name__ == '__main__': 90 import test_appliance 91 test_appliance.run(globals()) 64 92 65 def _convert(self, node):66 if isinstance(node, ScalarNode):67 return node.tag, node.value68 elif isinstance(node, SequenceNode):69 value = []70 for item in node.value:71 value.append(self._convert(item))72 return node.tag, value73 elif isinstance(node, MappingNode):74 value = []75 for key, item in node.value:76 value.append((self._convert(key), self._convert(item)))77 value.sort()78 return node.tag, value79 80 TestResolver.add_tests('testImplicitResolver', '.data', '.detect')81 TestResolver.add_tests('testPathResolverLoader', '.data', '.path')82 TestResolver.add_tests('testPathResolverDumper', '.data', '.path')83 -
pyyaml/trunk/tests/test_structure.py
r222 r322 1 1 2 import test_appliance 2 import yaml, canonical 3 import pprint 3 4 4 from yaml import * 5 def _convert_structure(loader): 6 if loader.check_event(yaml.ScalarEvent): 7 event = loader.get_event() 8 if event.tag or event.anchor or event.value: 9 return True 10 else: 11 return None 12 elif loader.check_event(yaml.SequenceStartEvent): 13 loader.get_event() 14 sequence = [] 15 while not loader.check_event(yaml.SequenceEndEvent): 16 sequence.append(_convert_structure(loader)) 17 loader.get_event() 18 return sequence 19 elif loader.check_event(yaml.MappingStartEvent): 20 loader.get_event() 21 mapping = [] 22 while not loader.check_event(yaml.MappingEndEvent): 23 key = _convert_structure(loader) 24 value = _convert_structure(loader) 25 mapping.append((key, value)) 26 loader.get_event() 27 return mapping 28 elif loader.check_event(yaml.AliasEvent): 29 loader.get_event() 30 return '*' 31 else: 32 loader.get_event() 33 return '?' 5 34 6 class TestStructure(test_appliance.TestAppliance): 35 def test_structure(data_filename, structure_filename, verbose=False): 36 nodes1 = [] 37 nodes2 = eval(open(structure_filename, 'rb').read()) 38 try: 39 loader = yaml.Loader(open(data_filename, 'rb')) 40 while loader.check_event(): 41 if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent, 42 yaml.DocumentStartEvent, yaml.DocumentEndEvent): 43 loader.get_event() 44 continue 45 nodes1.append(_convert_structure(loader)) 46 if len(nodes1) == 1: 47 nodes1 = nodes1[0] 48 assert nodes1 == nodes2, (nodes1, nodes2) 49 finally: 50 if verbose: 51 print "NODES1:" 52 pprint.pprint(nodes1) 53 print "NODES2:" 54 pprint.pprint(nodes2) 7 55 8 def _testStructure(self, test_name, data_filename, structure_filename): 9 node1 = None 10 node2 = eval(file(structure_filename, 'rb').read()) 11 try: 12 loader = Loader(file(data_filename, 'rb')) 13 node1 = [] 14 while not loader.check_event(StreamEndEvent): 15 if not loader.check_event(StreamStartEvent, DocumentStartEvent, DocumentEndEvent): 16 node1.append(self._convert(loader)) 17 else: 18 loader.get_event() 19 loader.get_event() 20 if len(node1) == 1: 21 node1 = node1[0] 22 self.failUnlessEqual(node1, node2) 23 except: 24 print 25 print "DATA:" 26 print file(data_filename, 'rb').read() 27 print "NODE1:", node1 28 print "NODE2:", node2 29 raise 56 test_structure.unittest = ['.data', '.structure'] 30 57 31 def _convert(self, loader): 32 if loader.check_event(ScalarEvent): 33 event = loader.get_event() 34 if event.tag or event.anchor or event.value: 35 return True 36 else: 37 return None 38 elif loader.check_event(SequenceStartEvent): 39 loader.get_event() 40 sequence = [] 41 while not loader.check_event(SequenceEndEvent): 42 sequence.append(self._convert(loader)) 43 loader.get_event() 44 return sequence 45 elif loader.check_event(MappingStartEvent): 46 loader.get_event() 47 mapping = [] 48 while not loader.check_event(MappingEndEvent): 49 key = self._convert(loader) 50 value = self._convert(loader) 51 mapping.append((key, value)) 52 loader.get_event() 53 return mapping 54 elif loader.check_event(AliasEvent): 55 loader.get_event() 56 return '*' 57 else: 58 loader.get_event() 59 return '?' 58 def _compare_events(events1, events2, full=False): 59 assert len(events1) == len(events2), (len(events1), len(events2)) 60 for event1, event2 in zip(events1, events2): 61 assert event1.__class__ == event2.__class__, (event1, event2) 62 if isinstance(event1, yaml.AliasEvent) and full: 63 assert event1.anchor == event2.anchor, (event1, event2) 64 if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): 65 if (event1.tag not in [None, u'!'] and event2.tag not in [None, u'!']) or full: 66 assert event1.tag == event2.tag, (event1, event2) 67 if isinstance(event1, yaml.ScalarEvent): 68 assert event1.value == event2.value, (event1, event2) 60 69 61 TestStructure.add_tests('testStructure', '.data', '.structure') 70 def test_parser(data_filename, canonical_filename, verbose=False): 71 events1 = None 72 events2 = None 73 try: 74 events1 = list(yaml.parse(open(data_filename, 'rb'))) 75 events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) 76 _compare_events(events1, events2) 77 finally: 78 if verbose: 79 print "EVENTS1:" 80 pprint.pprint(events1) 81 print "EVENTS2:" 82 pprint.pprint(events2) 62 83 63 class TestParser(test_appliance.TestAppliance): 84 test_parser.unittest = ['.data', '.canonical'] 64 85 65 def _testParser(self, test_name, data_filename, canonical_filename): 66 events1 = None 67 events2 = None 68 try: 69 events1 = list(parse(file(data_filename, 'rb'))) 70 events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb'))) 71 self._compare(events1, events2) 72 except: 73 print 74 print "DATA1:" 75 print file(data_filename, 'rb').read() 76 print "DATA2:" 77 print file(canonical_filename, 'rb').read() 78 print "EVENTS1:", events1 79 print "EVENTS2:", events2 80 raise 86 def test_parser_on_canonical(canonical_filename, verbose=False): 87 events1 = None 88 events2 = None 89 try: 90 events1 = list(yaml.parse(open(canonical_filename, 'rb'))) 91 events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) 92 _compare_events(events1, events2, full=True) 93 finally: 94 if verbose: 95 print "EVENTS1:" 96 pprint.pprint(events1) 97 print "EVENTS2:" 98 pprint.pprint(events2) 81 99 82 def _compare(self, events1, events2): 83 self.failUnlessEqual(len(events1), len(events2)) 84 for event1, event2 in zip(events1, events2): 85 self.failUnlessEqual(event1.__class__, event2.__class__) 86 if isinstance(event1, AliasEvent): 87 #self.failUnlessEqual(event1.name, event2.name) 88 pass 89 elif isinstance(event1, ScalarEvent): 90 #self.failUnlessEqual(event1.anchor, event2.anchor) 91 #self.failUnlessEqual(event1.tag, event2.tag) 92 self.failUnlessEqual(event1.value, event2.value) 93 if isinstance(event1, CollectionStartEvent): 94 #self.failUnlessEqual(event1.anchor, event2.anchor) 95 #self.failUnlessEqual(event1.tag, event2.tag) 96 pass 100 test_parser_on_canonical.unittest = ['.canonical'] 97 101 98 TestParser.add_tests('testParser', '.data', '.canonical') 102 def _compare_nodes(node1, node2): 103 assert node1.__class__ == node2.__class__, (node1, node2) 104 assert node1.tag == node2.tag, (node1, node2) 105 if isinstance(node1, yaml.ScalarNode): 106 assert node1.value == node2.value, (node1, node2) 107 else: 108 assert len(node1.value) == len(node2.value), (node1, node2) 109 for item1, item2 in zip(node1.value, node2.value): 110 if not isinstance(item1, tuple): 111 item1 = (item1,) 112 item2 = (item2,) 113 for subnode1, subnode2 in zip(item1, item2): 114 _compare_nodes(subnode1, subnode2) 99 115 100 class TestResolver(test_appliance.TestAppliance): 116 def test_composer(data_filename, canonical_filename, verbose=False): 117 nodes1 = None 118 nodes2 = None 119 try: 120 nodes1 = list(yaml.compose_all(open(data_filename, 'rb'))) 121 nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb'))) 122 assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) 123 for node1, node2 in zip(nodes1, nodes2): 124 _compare_nodes(node1, node2) 125 finally: 126 if verbose: 127 print "NODES1:" 128 pprint.pprint(nodes1) 129 print "NODES2:" 130 pprint.pprint(nodes2) 101 131 102 def _testResolver(self, test_name, data_filename, canonical_filename): 103 nodes1 = None 104 nodes2 = None 105 try: 106 nodes1 = list(compose_all(file(data_filename, 'rb'))) 107 nodes2 = list(test_appliance.canonical_compose_all(file(canonical_filename, 'rb'))) 108 self.failUnlessEqual(len(nodes1), len(nodes2)) 109 for node1, node2 in zip(nodes1, nodes2): 110 self._compare(node1, node2) 111 except: 112 print 113 print "DATA1:" 114 print file(data_filename, 'rb').read() 115 print "DATA2:" 116 print file(canonical_filename, 'rb').read() 117 print "NODES1:", nodes1 118 print "NODES2:", nodes2 119 raise 132 test_composer.unittest = ['.data', '.canonical'] 120 133 121 def _compare(self, node1, node2): 122 self.failUnlessEqual(node1.__class__, node2.__class__) 123 if isinstance(node1, ScalarNode): 124 #self.failUnlessEqual(node1.tag, node2.tag) 125 self.failUnlessEqual(node1.value, node2.value) 126 elif isinstance(node1, SequenceNode): 127 self.failUnlessEqual(len(node1.value), len(node2.value)) 128 for item1, item2 in zip(node1.value, node2.value): 129 self._compare(item1, item2) 130 elif isinstance(node1, MappingNode): 131 self.failUnlessEqual(len(node1.value), len(node2.value)) 132 items1 = node1.value.items() 133 items1.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value), 134 (k2.tag,k2.value,v2.tag,v2.value))) 135 items2 = node2.value.items() 136 items2.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value), 137 (k2.tag,k2.value,v2.tag,v2.value))) 138 for (key1, value1), (key2, value2) in zip(items1, items2): 139 self._compare(key1, key2) 140 self._compare(value1, value2) 134 def _make_loader(): 135 global MyLoader 141 136 142 TestResolver.add_tests('testResolver', '.data', '.canonical') 137 class MyLoader(yaml.Loader): 138 def construct_sequence(self, node): 139 return tuple(yaml.Loader.construct_sequence(self, node)) 140 def construct_mapping(self, node): 141 pairs = self.construct_pairs(node) 142 pairs.sort() 143 return pairs 144 def construct_undefined(self, node): 145 return self.construct_scalar(node) 143 146 144 class MyLoader(Loader): 145 def construct_sequence(self, node): 146 return tuple(Loader.construct_sequence(self, node)) 147 MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping) 148 MyLoader.add_constructor(None, MyLoader.construct_undefined) 147 149 148 def construct_mapping(self, node): 149 pairs = self.construct_pairs(node) 150 pairs.sort() 151 return pairs 150 def _make_canonical_loader(): 151 global MyCanonicalLoader 152 152 153 def construct_undefined(self, node): 154 return self.construct_scalar(node) 153 class MyCanonicalLoader(yaml.CanonicalLoader): 154 def construct_sequence(self, node): 155 return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) 156 def construct_mapping(self, node): 157 pairs = self.construct_pairs(node) 158 pairs.sort() 159 return pairs 160 def construct_undefined(self, node): 161 return self.construct_scalar(node) 155 162 156 MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping)157 MyLoader.add_constructor(None, MyLoader.construct_undefined)163 MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) 164 MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) 158 165 159 class MyCanonicalLoader(test_appliance.CanonicalLoader): 166 def test_constructor(data_filename, canonical_filename, verbose=False): 167 _make_loader() 168 _make_canonical_loader() 169 native1 = None 170 native2 = None 171 try: 172 native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) 173 native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) 174 assert native1 == native2, (native1, native2) 175 finally: 176 if verbose: 177 print "NATIVE1:" 178 pprint.pprint(native1) 179 print "NATIVE2:" 180 pprint.pprint(native2) 160 181 161 def construct_sequence(self, node): 162 return tuple(test_appliance.CanonicalLoader.construct_sequence(self, node)) 182 test_constructor.unittest = ['.data', '.canonical'] 163 183 164 def construct_mapping(self, node): 165 pairs = self.construct_pairs(node) 166 pairs.sort() 167 return pairs 184 if __name__ == '__main__': 185 import test_appliance 186 test_appliance.run(globals()) 168 187 169 def construct_undefined(self, node):170 return self.construct_scalar(node)171 172 MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping)173 MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)174 175 class TestConstructor(test_appliance.TestAppliance):176 177 def _testConstructor(self, test_name, data_filename, canonical_filename):178 data1 = None179 data2 = None180 try:181 data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader))182 data2 = list(load_all(file(canonical_filename, 'rb'), Loader=MyCanonicalLoader))183 self.failUnlessEqual(data1, data2)184 except:185 print186 print "DATA1:"187 print file(data_filename, 'rb').read()188 print "DATA2:"189 print file(canonical_filename, 'rb').read()190 print "NATIVES1:", data1191 print "NATIVES2:", data2192 raise193 194 TestConstructor.add_tests('testConstructor', '.data', '.canonical')195 196 class TestParserOnCanonical(test_appliance.TestAppliance):197 198 def _testParserOnCanonical(self, test_name, canonical_filename):199 events1 = None200 events2 = None201 try:202 events1 = list(parse(file(canonical_filename, 'rb')))203 events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb')))204 self._compare(events1, events2)205 except:206 print207 print "DATA:"208 print file(canonical_filename, 'rb').read()209 print "EVENTS1:", events1210 print "EVENTS2:", events2211 raise212 213 def _compare(self, events1, events2):214 self.failUnlessEqual(len(events1), len(events2))215 for event1, event2 in zip(events1, events2):216 self.failUnlessEqual(event1.__class__, event2.__class__)217 if isinstance(event1, AliasEvent):218 self.failUnlessEqual(event1.anchor, event2.anchor)219 elif isinstance(event1, ScalarEvent):220 self.failUnlessEqual(event1.anchor, event2.anchor)221 self.failUnlessEqual(event1.tag, event2.tag)222 self.failUnlessEqual(event1.value, event2.value)223 if isinstance(event1, CollectionStartEvent):224 self.failUnlessEqual(event1.anchor, event2.anchor)225 self.failUnlessEqual(event1.tag, event2.tag)226 227 TestParserOnCanonical.add_tests('testParserOnCanonical', '.canonical')228 -
pyyaml/trunk/tests/test_tokens.py
r136 r322 1 1 2 import test_appliance 2 import yaml 3 import pprint 3 4 4 from yaml import * 5 # Tokens mnemonic: 6 # directive: % 7 # document_start: --- 8 # document_end: ... 9 # alias: * 10 # anchor: & 11 # tag: ! 12 # scalar _ 13 # block_sequence_start: [[ 14 # block_mapping_start: {{ 15 # block_end: ]} 16 # flow_sequence_start: [ 17 # flow_sequence_end: ] 18 # flow_mapping_start: { 19 # flow_mapping_end: } 20 # entry: , 21 # key: ? 22 # value: : 5 23 6 class TestTokens(test_appliance.TestAppliance): 24 _replaces = { 25 yaml.DirectiveToken: '%', 26 yaml.DocumentStartToken: '---', 27 yaml.DocumentEndToken: '...', 28 yaml.AliasToken: '*', 29 yaml.AnchorToken: '&', 30 yaml.TagToken: '!', 31 yaml.ScalarToken: '_', 32 yaml.BlockSequenceStartToken: '[[', 33 yaml.BlockMappingStartToken: '{{', 34 yaml.BlockEndToken: ']}', 35 yaml.FlowSequenceStartToken: '[', 36 yaml.FlowSequenceEndToken: ']', 37 yaml.FlowMappingStartToken: '{', 38 yaml.FlowMappingEndToken: '}', 39 yaml.BlockEntryToken: ',', 40 yaml.FlowEntryToken: ',', 41 yaml.KeyToken: '?', 42 yaml.ValueToken: ':', 43 } 7 44 8 # Tokens mnemonic: 9 # directive: % 10 # document_start: --- 11 # document_end: ... 12 # alias: * 13 # anchor: & 14 # tag: ! 15 # scalar _ 16 # block_sequence_start: [[ 17 # block_mapping_start: {{ 18 # block_end: ]} 19 # flow_sequence_start: [ 20 # flow_sequence_end: ] 21 # flow_mapping_start: { 22 # flow_mapping_end: } 23 # entry: , 24 # key: ? 25 # value: : 45 def test_tokens(data_filename, tokens_filename, verbose=False): 46 tokens1 = [] 47 tokens2 = open(tokens_filename, 'rb').read().split() 48 try: 49 for token in yaml.scan(open(data_filename, 'rb')): 50 if not isinstance(token, (yaml.StreamStartToken, yaml.StreamEndToken)): 51 tokens1.append(_replaces[token.__class__]) 52 finally: 53 if verbose: 54 print "TOKENS1:", ' '.join(tokens1) 55 print "TOKENS2:", ' '.join(tokens2) 56 assert len(tokens1) == len(tokens2), (tokens1, tokens2) 57 for token1, token2 in zip(tokens1, tokens2): 58 assert token1 == token2, (token1, token2) 26 59 27 replaces = { 28 DirectiveToken: '%', 29 DocumentStartToken: '---', 30 DocumentEndToken: '...', 31 AliasToken: '*', 32 AnchorToken: '&', 33 TagToken: '!', 34 ScalarToken: '_', 35 BlockSequenceStartToken: '[[', 36 BlockMappingStartToken: '{{', 37 BlockEndToken: ']}', 38 FlowSequenceStartToken: '[', 39 FlowSequenceEndToken: ']', 40 FlowMappingStartToken: '{', 41 FlowMappingEndToken: '}', 42 BlockEntryToken: ',', 43 FlowEntryToken: ',', 44 KeyToken: '?', 45 ValueToken: ':', 46 } 60 test_tokens.unittest = ['.data', '.tokens'] 47 61 48 def _testTokens(self, test_name, data_filename, tokens_filename):49 tokens1 = None50 tokens 2 = file(tokens_filename, 'rb').read().split()62 def test_scanner(data_filename, canonical_filename, verbose=False): 63 for filename in [data_filename, canonical_filename]: 64 tokens = [] 51 65 try: 52 tokens1 = [] 53 for token in scan(file(data_filename, 'rb')): 54 if not isinstance(token, (StreamStartToken, StreamEndToken)): 55 tokens1.append(token) 56 tokens1 = [self.replaces[t.__class__] for t in tokens1] 57 self.failUnlessEqual(tokens1, tokens2) 58 except: 59 print 60 print "DATA:" 61 print file(data_filename, 'rb').read() 62 print "TOKENS1:", tokens1 63 print "TOKENS2:", tokens2 64 raise 66 for token in yaml.scan(open(filename, 'rb')): 67 tokens.append(token.__class__.__name__) 68 finally: 69 if verbose: 70 pprint.pprint(tokens) 65 71 66 TestTokens.add_tests('testTokens', '.data', '.tokens') 72 test_scanner.unittest = ['.data', '.canonical'] 67 73 68 class TestScanner(test_appliance.TestAppliance): 74 if __name__ == '__main__': 75 import test_appliance 76 test_appliance.run(globals()) 69 77 70 def _testScanner(self, test_name, data_filename, canonical_filename):71 for filename in [canonical_filename, data_filename]:72 tokens = None73 try:74 tokens = []75 for token in scan(file(filename, 'rb')):76 if not isinstance(token, (StreamStartToken, StreamEndToken)):77 tokens.append(token.__class__.__name__)78 except:79 print80 print "DATA:"81 print file(data_filename, 'rb').read()82 print "TOKENS:", tokens83 raise84 85 TestScanner.add_tests('testScanner', '.data', '.canonical')86 -
pyyaml/trunk/tests/test_yaml.py
r142 r322 1 2 import unittest3 1 4 2 from test_mark import * … … 13 11 from test_representer import * 14 12 from test_recursive import * 15 from test_syck import *16 17 def main(module='__main__'):18 unittest.main(module)19 13 20 14 if __name__ == '__main__': 21 main() 15 import test_appliance 16 test_appliance.run(globals()) 22 17 -
pyyaml/trunk/tests/test_yaml_ext.py
r312 r322 1 2 import unittest, test_appliance3 1 4 2 import _yaml, yaml 5 6 test_appliance.TestAppliance.SKIP_EXT = '.skip-ext' 7 8 class TestCVersion(unittest.TestCase): 9 10 def testCVersion(self): 11 self.failUnlessEqual("%s.%s.%s" % _yaml.get_version(), _yaml.get_version_string()) 12 13 class TestCLoader(test_appliance.TestAppliance): 14 15 def _testCScannerFileInput(self, test_name, data_filename, canonical_filename): 16 self._testCScanner(test_name, data_filename, canonical_filename, True) 17 18 def _testCScanner(self, test_name, data_filename, canonical_filename, file_input=False, Loader=yaml.Loader): 19 if file_input: 20 data = file(data_filename, 'r') 21 else: 22 data = file(data_filename, 'r').read() 23 tokens = list(yaml.scan(data, Loader=Loader)) 24 ext_tokens = [] 3 import types, pprint 4 5 yaml.PyBaseLoader = yaml.BaseLoader 6 yaml.PySafeLoader = yaml.SafeLoader 7 yaml.PyLoader = yaml.Loader 8 yaml.PyBaseDumper = yaml.BaseDumper 9 yaml.PySafeDumper = yaml.SafeDumper 10 yaml.PyDumper = yaml.Dumper 11 12 old_scan = yaml.scan 13 def new_scan(stream, Loader=yaml.CLoader): 14 return old_scan(stream, Loader) 15 16 old_parse = yaml.parse 17 def new_parse(stream, Loader=yaml.CLoader): 18 return old_parse(stream, Loader) 19 20 old_compose = yaml.compose 21 def new_compose(stream, Loader=yaml.CLoader): 22 return old_compose(stream, Loader) 23 24 old_compose_all = yaml.compose_all 25 def new_compose_all(stream, Loader=yaml.CLoader): 26 return old_compose_all(stream, Loader) 27 28 old_load = yaml.load 29 def new_load(stream, Loader=yaml.CLoader): 30 return old_load(stream, Loader) 31 32 old_load_all = yaml.load_all 33 def new_load_all(stream, Loader=yaml.CLoader): 34 return old_load_all(stream, Loader) 35 36 old_safe_load = yaml.safe_load 37 def new_safe_load(stream): 38 return old_load(stream, yaml.CSafeLoader) 39 40 old_safe_load_all = yaml.safe_load_all 41 def new_safe_load_all(stream): 42 return old_load_all(stream, yaml.CSafeLoader) 43 44 old_emit = yaml.emit 45 def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds): 46 return old_emit(events, stream, Dumper, **kwds) 47 48 old_serialize = yaml.serialize 49 def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds): 50 return old_serialize(node, stream, Dumper, **kwds) 51 52 old_serialize_all = yaml.serialize_all 53 def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): 54 return old_serialize_all(nodes, stream, Dumper, **kwds) 55 56 old_dump = yaml.dump 57 def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds): 58 return old_dump(data, stream, Dumper, **kwds) 59 60 old_dump_all = yaml.dump_all 61 def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): 62 return old_dump_all(documents, stream, Dumper, **kwds) 63 64 old_safe_dump = yaml.safe_dump 65 def new_safe_dump(data, stream=None, **kwds): 66 return old_dump(data, stream, yaml.CSafeDumper, **kwds) 67 68 old_safe_dump_all = yaml.safe_dump_all 69 def new_safe_dump_all(documents, stream=None, **kwds): 70 return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds) 71 72 def _set_up(): 73 yaml.BaseLoader = yaml.CBaseLoader 74 yaml.SafeLoader = yaml.CSafeLoader 75 yaml.Loader = yaml.CLoader 76 yaml.BaseDumper = yaml.CBaseDumper 77 yaml.SafeDumper = yaml.CSafeDumper 78 yaml.Dumper = yaml.CDumper 79 yaml.scan = new_scan 80 yaml.parse = new_parse 81 yaml.compose = new_compose 82 yaml.compose_all = new_compose_all 83 yaml.load = new_load 84 yaml.load_all = new_load_all 85 yaml.safe_load = new_safe_load 86 yaml.safe_load_all = new_safe_load_all 87 yaml.emit = new_emit 88 yaml.serialize = new_serialize 89 yaml.serialize_all = new_serialize_all 90 yaml.dump = new_dump 91 yaml.dump_all = new_dump_all 92 yaml.safe_dump = new_safe_dump 93 yaml.safe_dump_all = new_safe_dump_all 94 95 def _tear_down(): 96 yaml.BaseLoader = yaml.PyBaseLoader 97 yaml.SafeLoader = yaml.PySafeLoader 98 yaml.Loader = yaml.PyLoader 99 yaml.BaseDumper = yaml.PyBaseDumper 100 yaml.SafeDumper = yaml.PySafeDumper 101 yaml.Dumper = yaml.PyDumper 102 yaml.scan = old_scan 103 yaml.parse = old_parse 104 yaml.compose = old_compose 105 yaml.compose_all = old_compose_all 106 yaml.load = old_load 107 yaml.load_all = old_load_all 108 yaml.safe_load = old_safe_load 109 yaml.safe_load_all = old_safe_load_all 110 yaml.emit = old_emit 111 yaml.serialize = old_serialize 112 yaml.serialize_all = old_serialize_all 113 yaml.dump = old_dump 114 yaml.dump_all = old_dump_all 115 yaml.safe_dump = old_safe_dump 116 yaml.safe_dump_all = old_safe_dump_all 117 118 def test_c_version(verbose=False): 119 if verbose: 120 print _yaml.get_version() 121 print _yaml.get_version_string() 122 assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(), \ 123 (_yaml.get_version(), _yaml.get_version_string()) 124 125 def _compare_scanners(py_data, c_data, verbose): 126 py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader)) 127 c_tokens = [] 128 try: 129 for token in yaml.scan(c_data, Loader=yaml.CLoader): 130 c_tokens.append(token) 131 assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens)) 132 for py_token, c_token in zip(py_tokens, c_tokens): 133 assert py_token.__class__ == c_token.__class__, (py_token, c_token) 134 if hasattr(py_token, 'value'): 135 assert py_token.value == c_token.value, (py_token, c_token) 136 if isinstance(py_token, yaml.StreamEndToken): 137 continue 138 py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column) 139 py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column) 140 c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column) 141 c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column) 142 assert py_start == c_start, (py_start, c_start) 143 assert py_end == c_end, (py_end, c_end) 144 finally: 145 if verbose: 146 print "PY_TOKENS:" 147 pprint.pprint(py_tokens) 148 print "C_TOKENS:" 149 pprint.pprint(c_tokens) 150 151 def test_c_scanner(data_filename, canonical_filename, verbose=False): 152 _compare_scanners(open(data_filename, 'rb'), 153 open(data_filename, 'rb'), verbose) 154 _compare_scanners(open(data_filename, 'rb').read(), 155 open(data_filename, 'rb').read(), verbose) 156 _compare_scanners(open(canonical_filename, 'rb'), 157 open(canonical_filename, 'rb'), verbose) 158 _compare_scanners(open(canonical_filename, 'rb').read(), 159 open(canonical_filename, 'rb').read(), verbose) 160 161 test_c_scanner.unittest = ['.data', '.canonical'] 162 test_c_scanner.skip = ['.skip-ext'] 163 164 def _compare_parsers(py_data, c_data, verbose): 165 py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader)) 166 c_events = [] 167 try: 168 for event in yaml.parse(c_data, Loader=yaml.CLoader): 169 c_events.append(event) 170 assert len(py_events) == len(c_events), (len(py_events), len(c_events)) 171 for py_event, c_event in zip(py_events, c_events): 172 for attribute in ['__class__', 'anchor', 'tag', 'implicit', 173 'value', 'explicit', 'version', 'tags']: 174 py_value = getattr(py_event, attribute, None) 175 c_value = getattr(c_event, attribute, None) 176 assert py_value == c_value, (py_event, c_event, attribute) 177 finally: 178 if verbose: 179 print "PY_EVENTS:" 180 pprint.pprint(py_events) 181 print "C_EVENTS:" 182 pprint.pprint(c_events) 183 184 def test_c_parser(data_filename, canonical_filename, verbose=False): 185 _compare_parsers(open(data_filename, 'rb'), 186 open(data_filename, 'rb'), verbose) 187 _compare_parsers(open(data_filename, 'rb').read(), 188 open(data_filename, 'rb').read(), verbose) 189 _compare_parsers(open(canonical_filename, 'rb'), 190 open(canonical_filename, 'rb'), verbose) 191 _compare_parsers(open(canonical_filename, 'rb').read(), 192 open(canonical_filename, 'rb').read(), verbose) 193 194 test_c_parser.unittest = ['.data', '.canonical'] 195 test_c_parser.skip = ['.skip-ext'] 196 197 def _compare_emitters(data, verbose): 198 events = list(yaml.parse(data, Loader=yaml.PyLoader)) 199 c_data = yaml.emit(events, Dumper=yaml.CDumper) 200 if verbose: 201 print c_data 202 py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader)) 203 c_events = list(yaml.parse(c_data, Loader=yaml.CLoader)) 204 try: 205 assert len(events) == len(py_events), (len(events), len(py_events)) 206 assert len(events) == len(c_events), (len(events), len(c_events)) 207 for event, py_event, c_event in zip(events, py_events, c_events): 208 for attribute in ['__class__', 'anchor', 'tag', 'implicit', 209 'value', 'explicit', 'version', 'tags']: 210 value = getattr(event, attribute, None) 211 py_value = getattr(py_event, attribute, None) 212 c_value = getattr(c_event, attribute, None) 213 if attribute == 'tag' and value in [None, u'!'] \ 214 and py_value in [None, u'!'] and c_value in [None, u'!']: 215 continue 216 if attribute == 'explicit' and (py_value or c_value): 217 continue 218 assert value == py_value, (event, py_event, attribute) 219 assert value == c_value, (event, c_event, attribute) 220 finally: 221 if verbose: 222 print "EVENTS:" 223 pprint.pprint(events) 224 print "PY_EVENTS:" 225 pprint.pprint(py_events) 226 print "C_EVENTS:" 227 pprint.pprint(c_events) 228 229 def test_c_emitter(data_filename, canonical_filename, verbose=False): 230 _compare_emitters(open(data_filename, 'rb').read(), verbose) 231 _compare_emitters(open(canonical_filename, 'rb').read(), verbose) 232 233 test_c_emitter.unittest = ['.data', '.canonical'] 234 test_c_emitter.skip = ['.skip-ext'] 235 236 def wrap_ext_function(function): 237 def wrapper(*args, **kwds): 238 _set_up() 25 239 try: 26 if file_input: 27 data = file(data_filename, 'r') 28 for token in yaml.scan(data, Loader=yaml.CLoader): 29 ext_tokens.append(token) 30 self.failUnlessEqual(len(tokens), len(ext_tokens)) 31 for token, ext_token in zip(tokens, ext_tokens): 32 self.failUnlessEqual(token.__class__, ext_token.__class__) 33 if not isinstance(token, yaml.StreamEndToken): 34 self.failUnlessEqual((token.start_mark.index, token.start_mark.line, token.start_mark.column), 35 (ext_token.start_mark.index, ext_token.start_mark.line, ext_token.start_mark.column)) 36 self.failUnlessEqual((token.end_mark.index, token.end_mark.line, token.end_mark.column), 37 (ext_token.end_mark.index, ext_token.end_mark.line, ext_token.end_mark.column)) 38 if hasattr(token, 'value'): 39 self.failUnlessEqual(token.value, ext_token.value) 40 except: 41 print 42 print "DATA:" 43 print file(data_filename, 'rb').read() 44 print "TOKENS:", tokens 45 print "EXT_TOKENS:", ext_tokens 46 raise 47 48 def _testCParser(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader): 49 data = file(data_filename, 'r').read() 50 events = list(yaml.parse(data, Loader=Loader)) 51 ext_events = [] 52 try: 53 for event in yaml.parse(data, Loader=yaml.CLoader): 54 ext_events.append(event) 55 #print "EVENT:", event 56 self.failUnlessEqual(len(events), len(ext_events)) 57 for event, ext_event in zip(events, ext_events): 58 self.failUnlessEqual(event.__class__, ext_event.__class__) 59 if hasattr(event, 'anchor'): 60 self.failUnlessEqual(event.anchor, ext_event.anchor) 61 if hasattr(event, 'tag'): 62 self.failUnlessEqual(event.tag, ext_event.tag) 63 if hasattr(event, 'implicit'): 64 self.failUnlessEqual(event.implicit, ext_event.implicit) 65 if hasattr(event, 'value'): 66 self.failUnlessEqual(event.value, ext_event.value) 67 if hasattr(event, 'explicit'): 68 self.failUnlessEqual(event.explicit, ext_event.explicit) 69 if hasattr(event, 'version'): 70 self.failUnlessEqual(event.version, ext_event.version) 71 if hasattr(event, 'tags'): 72 self.failUnlessEqual(event.tags, ext_event.tags) 73 except: 74 print 75 print "DATA:" 76 print file(data_filename, 'rb').read() 77 print "EVENTS:", events 78 print "EXT_EVENTS:", ext_events 79 raise 80 81 TestCLoader.add_tests('testCScanner', '.data', '.canonical') 82 TestCLoader.add_tests('testCScannerFileInput', '.data', '.canonical') 83 TestCLoader.add_tests('testCParser', '.data', '.canonical') 84 85 class TestCEmitter(test_appliance.TestAppliance): 86 87 def _testCEmitter(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader): 88 data1 = file(data_filename, 'r').read() 89 events = list(yaml.parse(data1, Loader=Loader)) 90 data2 = yaml.emit(events, Dumper=yaml.CDumper) 91 ext_events = [] 92 try: 93 for event in yaml.parse(data2): 94 ext_events.append(event) 95 self.failUnlessEqual(len(events), len(ext_events)) 96 for event, ext_event in zip(events, ext_events): 97 self.failUnlessEqual(event.__class__, ext_event.__class__) 98 if hasattr(event, 'anchor'): 99 self.failUnlessEqual(event.anchor, ext_event.anchor) 100 if hasattr(event, 'tag'): 101 if not (event.tag in ['!', None] and ext_event.tag in ['!', None]): 102 self.failUnlessEqual(event.tag, ext_event.tag) 103 if hasattr(event, 'implicit'): 104 self.failUnlessEqual(event.implicit, ext_event.implicit) 105 if hasattr(event, 'value'): 106 self.failUnlessEqual(event.value, ext_event.value) 107 if hasattr(event, 'explicit') and event.explicit: 108 self.failUnlessEqual(event.explicit, ext_event.explicit) 109 if hasattr(event, 'version'): 110 self.failUnlessEqual(event.version, ext_event.version) 111 if hasattr(event, 'tags'): 112 self.failUnlessEqual(event.tags, ext_event.tags) 113 except: 114 print 115 print "DATA1:" 116 print data1 117 print "DATA2:" 118 print data2 119 print "EVENTS:", events 120 print "EXT_EVENTS:", ext_events 121 raise 122 123 TestCEmitter.add_tests('testCEmitter', '.data', '.canonical') 124 125 yaml.BaseLoader = yaml.CBaseLoader 126 yaml.SafeLoader = yaml.CSafeLoader 127 yaml.Loader = yaml.CLoader 128 yaml.BaseDumper = yaml.CBaseDumper 129 yaml.SafeDumper = yaml.CSafeDumper 130 yaml.Dumper = yaml.CDumper 131 old_scan = yaml.scan 132 def scan(stream, Loader=yaml.CLoader): 133 return old_scan(stream, Loader) 134 yaml.scan = scan 135 old_parse = yaml.parse 136 def parse(stream, Loader=yaml.CLoader): 137 return old_parse(stream, Loader) 138 yaml.parse = parse 139 old_compose = yaml.compose 140 def compose(stream, Loader=yaml.CLoader): 141 return old_compose(stream, Loader) 142 yaml.compose = compose 143 old_compose_all = yaml.compose_all 144 def compose_all(stream, Loader=yaml.CLoader): 145 return old_compose_all(stream, Loader) 146 yaml.compose_all = compose_all 147 old_load_all = yaml.load_all 148 def load_all(stream, Loader=yaml.CLoader): 149 return old_load_all(stream, Loader) 150 yaml.load_all = load_all 151 old_load = yaml.load 152 def load(stream, Loader=yaml.CLoader): 153 return old_load(stream, Loader) 154 yaml.load = load 155 def safe_load_all(stream): 156 return yaml.load_all(stream, yaml.CSafeLoader) 157 yaml.safe_load_all = safe_load_all 158 def safe_load(stream): 159 return yaml.load(stream, yaml.CSafeLoader) 160 yaml.safe_load = safe_load 161 old_emit = yaml.emit 162 def emit(events, stream=None, Dumper=yaml.CDumper, **kwds): 163 return old_emit(events, stream, Dumper, **kwds) 164 yaml.emit = emit 165 old_serialize_all = yaml.serialize_all 166 def serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): 167 return old_serialize_all(nodes, stream, Dumper, **kwds) 168 yaml.serialize_all = serialize_all 169 old_serialize = yaml.serialize 170 def serialize(node, stream, Dumper=yaml.CDumper, **kwds): 171 return old_serialize(node, stream, Dumper, **kwds) 172 yaml.serialize = serialize 173 old_dump_all = yaml.dump_all 174 def dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): 175 return old_dump_all(documents, stream, Dumper, **kwds) 176 yaml.dump_all = dump_all 177 old_dump = yaml.dump 178 def dump(data, stream=None, Dumper=yaml.CDumper, **kwds): 179 return old_dump(data, stream, Dumper, **kwds) 180 yaml.dump = dump 181 def safe_dump_all(documents, stream=None, **kwds): 182 return yaml.dump_all(documents, stream, yaml.CSafeDumper, **kwds) 183 yaml.safe_dump_all = safe_dump_all 184 def safe_dump(data, stream=None, **kwds): 185 return yaml.dump(data, stream, yaml.CSafeDumper, **kwds) 186 yaml.safe_dump = safe_dump 187 188 from test_yaml import * 189 190 def main(module='__main__'): 191 unittest.main(module) 240 function(*args, **kwds) 241 finally: 242 _tear_down() 243 wrapper.func_name = '%s_ext' % function.func_name 244 wrapper.unittest = function.unittest 245 wrapper.skip = getattr(function, 'skip', [])+['.skip-ext'] 246 return wrapper 247 248 def wrap_ext(collections): 249 functions = [] 250 if not isinstance(collections, list): 251 collections = [collections] 252 for collection in collections: 253 if not isinstance(collection, dict): 254 collection = vars(collection) 255 keys = collection.keys() 256 keys.sort() 257 for key in keys: 258 value = collection[key] 259 if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 260 functions.append(wrap_ext_function(value)) 261 for function in functions: 262 assert function.func_name not in globals() 263 globals()[function.func_name] = function 264 265 import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \ 266 test_emitter, test_representer, test_recursive 267 wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor, 268 test_emitter, test_representer, test_recursive]) 192 269 193 270 if __name__ == '__main__': 194 main() 195 271 import test_appliance 272 test_appliance.run(globals()) 273
