| 4 | | from yaml import * |
| | 5 | def _convert_structure(loader): |
| | 6 | if loader.check_event(yaml.ScalarEvent): |
| | 7 | event = loader.get_event() |
| | 8 | if event.tag or event.anchor or event.value: |
| | 9 | return True |
| | 10 | else: |
| | 11 | return None |
| | 12 | elif loader.check_event(yaml.SequenceStartEvent): |
| | 13 | loader.get_event() |
| | 14 | sequence = [] |
| | 15 | while not loader.check_event(yaml.SequenceEndEvent): |
| | 16 | sequence.append(_convert_structure(loader)) |
| | 17 | loader.get_event() |
| | 18 | return sequence |
| | 19 | elif loader.check_event(yaml.MappingStartEvent): |
| | 20 | loader.get_event() |
| | 21 | mapping = [] |
| | 22 | while not loader.check_event(yaml.MappingEndEvent): |
| | 23 | key = _convert_structure(loader) |
| | 24 | value = _convert_structure(loader) |
| | 25 | mapping.append((key, value)) |
| | 26 | loader.get_event() |
| | 27 | return mapping |
| | 28 | elif loader.check_event(yaml.AliasEvent): |
| | 29 | loader.get_event() |
| | 30 | return '*' |
| | 31 | else: |
| | 32 | loader.get_event() |
| | 33 | return '?' |
| 6 | | class TestStructure(test_appliance.TestAppliance): |
| | 35 | def test_structure(data_filename, structure_filename, verbose=False): |
| | 36 | nodes1 = [] |
| | 37 | nodes2 = eval(open(structure_filename, 'rb').read()) |
| | 38 | try: |
| | 39 | loader = yaml.Loader(open(data_filename, 'rb')) |
| | 40 | while loader.check_event(): |
| | 41 | if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent, |
| | 42 | yaml.DocumentStartEvent, yaml.DocumentEndEvent): |
| | 43 | loader.get_event() |
| | 44 | continue |
| | 45 | nodes1.append(_convert_structure(loader)) |
| | 46 | if len(nodes1) == 1: |
| | 47 | nodes1 = nodes1[0] |
| | 48 | assert nodes1 == nodes2, (nodes1, nodes2) |
| | 49 | finally: |
| | 50 | if verbose: |
| | 51 | print "NODES1:" |
| | 52 | pprint.pprint(nodes1) |
| | 53 | print "NODES2:" |
| | 54 | pprint.pprint(nodes2) |
| 8 | | def _testStructure(self, test_name, data_filename, structure_filename): |
| 9 | | node1 = None |
| 10 | | node2 = eval(file(structure_filename, 'rb').read()) |
| 11 | | try: |
| 12 | | loader = Loader(file(data_filename, 'rb')) |
| 13 | | node1 = [] |
| 14 | | while not loader.check_event(StreamEndEvent): |
| 15 | | if not loader.check_event(StreamStartEvent, DocumentStartEvent, DocumentEndEvent): |
| 16 | | node1.append(self._convert(loader)) |
| 17 | | else: |
| 18 | | loader.get_event() |
| 19 | | loader.get_event() |
| 20 | | if len(node1) == 1: |
| 21 | | node1 = node1[0] |
| 22 | | self.failUnlessEqual(node1, node2) |
| 23 | | except: |
| 24 | | print |
| 25 | | print "DATA:" |
| 26 | | print file(data_filename, 'rb').read() |
| 27 | | print "NODE1:", node1 |
| 28 | | print "NODE2:", node2 |
| 29 | | raise |
| | 56 | test_structure.unittest = ['.data', '.structure'] |
| 31 | | def _convert(self, loader): |
| 32 | | if loader.check_event(ScalarEvent): |
| 33 | | event = loader.get_event() |
| 34 | | if event.tag or event.anchor or event.value: |
| 35 | | return True |
| 36 | | else: |
| 37 | | return None |
| 38 | | elif loader.check_event(SequenceStartEvent): |
| 39 | | loader.get_event() |
| 40 | | sequence = [] |
| 41 | | while not loader.check_event(SequenceEndEvent): |
| 42 | | sequence.append(self._convert(loader)) |
| 43 | | loader.get_event() |
| 44 | | return sequence |
| 45 | | elif loader.check_event(MappingStartEvent): |
| 46 | | loader.get_event() |
| 47 | | mapping = [] |
| 48 | | while not loader.check_event(MappingEndEvent): |
| 49 | | key = self._convert(loader) |
| 50 | | value = self._convert(loader) |
| 51 | | mapping.append((key, value)) |
| 52 | | loader.get_event() |
| 53 | | return mapping |
| 54 | | elif loader.check_event(AliasEvent): |
| 55 | | loader.get_event() |
| 56 | | return '*' |
| 57 | | else: |
| 58 | | loader.get_event() |
| 59 | | return '?' |
| | 58 | def _compare_events(events1, events2, full=False): |
| | 59 | assert len(events1) == len(events2), (len(events1), len(events2)) |
| | 60 | for event1, event2 in zip(events1, events2): |
| | 61 | assert event1.__class__ == event2.__class__, (event1, event2) |
| | 62 | if isinstance(event1, yaml.AliasEvent) and full: |
| | 63 | assert event1.anchor == event2.anchor, (event1, event2) |
| | 64 | if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): |
| | 65 | if (event1.tag not in [None, u'!'] and event2.tag not in [None, u'!']) or full: |
| | 66 | assert event1.tag == event2.tag, (event1, event2) |
| | 67 | if isinstance(event1, yaml.ScalarEvent): |
| | 68 | assert event1.value == event2.value, (event1, event2) |
| 65 | | def _testParser(self, test_name, data_filename, canonical_filename): |
| 66 | | events1 = None |
| 67 | | events2 = None |
| 68 | | try: |
| 69 | | events1 = list(parse(file(data_filename, 'rb'))) |
| 70 | | events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb'))) |
| 71 | | self._compare(events1, events2) |
| 72 | | except: |
| 73 | | print |
| 74 | | print "DATA1:" |
| 75 | | print file(data_filename, 'rb').read() |
| 76 | | print "DATA2:" |
| 77 | | print file(canonical_filename, 'rb').read() |
| 78 | | print "EVENTS1:", events1 |
| 79 | | print "EVENTS2:", events2 |
| 80 | | raise |
| | 86 | def test_parser_on_canonical(canonical_filename, verbose=False): |
| | 87 | events1 = None |
| | 88 | events2 = None |
| | 89 | try: |
| | 90 | events1 = list(yaml.parse(open(canonical_filename, 'rb'))) |
| | 91 | events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) |
| | 92 | _compare_events(events1, events2, full=True) |
| | 93 | finally: |
| | 94 | if verbose: |
| | 95 | print "EVENTS1:" |
| | 96 | pprint.pprint(events1) |
| | 97 | print "EVENTS2:" |
| | 98 | pprint.pprint(events2) |
| 82 | | def _compare(self, events1, events2): |
| 83 | | self.failUnlessEqual(len(events1), len(events2)) |
| 84 | | for event1, event2 in zip(events1, events2): |
| 85 | | self.failUnlessEqual(event1.__class__, event2.__class__) |
| 86 | | if isinstance(event1, AliasEvent): |
| 87 | | #self.failUnlessEqual(event1.name, event2.name) |
| 88 | | pass |
| 89 | | elif isinstance(event1, ScalarEvent): |
| 90 | | #self.failUnlessEqual(event1.anchor, event2.anchor) |
| 91 | | #self.failUnlessEqual(event1.tag, event2.tag) |
| 92 | | self.failUnlessEqual(event1.value, event2.value) |
| 93 | | if isinstance(event1, CollectionStartEvent): |
| 94 | | #self.failUnlessEqual(event1.anchor, event2.anchor) |
| 95 | | #self.failUnlessEqual(event1.tag, event2.tag) |
| 96 | | pass |
| | 100 | test_parser_on_canonical.unittest = ['.canonical'] |
| 98 | | TestParser.add_tests('testParser', '.data', '.canonical') |
| | 102 | def _compare_nodes(node1, node2): |
| | 103 | assert node1.__class__ == node2.__class__, (node1, node2) |
| | 104 | assert node1.tag == node2.tag, (node1, node2) |
| | 105 | if isinstance(node1, yaml.ScalarNode): |
| | 106 | assert node1.value == node2.value, (node1, node2) |
| | 107 | else: |
| | 108 | assert len(node1.value) == len(node2.value), (node1, node2) |
| | 109 | for item1, item2 in zip(node1.value, node2.value): |
| | 110 | if not isinstance(item1, tuple): |
| | 111 | item1 = (item1,) |
| | 112 | item2 = (item2,) |
| | 113 | for subnode1, subnode2 in zip(item1, item2): |
| | 114 | _compare_nodes(subnode1, subnode2) |
| 100 | | class TestResolver(test_appliance.TestAppliance): |
| | 116 | def test_composer(data_filename, canonical_filename, verbose=False): |
| | 117 | nodes1 = None |
| | 118 | nodes2 = None |
| | 119 | try: |
| | 120 | nodes1 = list(yaml.compose_all(open(data_filename, 'rb'))) |
| | 121 | nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb'))) |
| | 122 | assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) |
| | 123 | for node1, node2 in zip(nodes1, nodes2): |
| | 124 | _compare_nodes(node1, node2) |
| | 125 | finally: |
| | 126 | if verbose: |
| | 127 | print "NODES1:" |
| | 128 | pprint.pprint(nodes1) |
| | 129 | print "NODES2:" |
| | 130 | pprint.pprint(nodes2) |
| 102 | | def _testResolver(self, test_name, data_filename, canonical_filename): |
| 103 | | nodes1 = None |
| 104 | | nodes2 = None |
| 105 | | try: |
| 106 | | nodes1 = list(compose_all(file(data_filename, 'rb'))) |
| 107 | | nodes2 = list(test_appliance.canonical_compose_all(file(canonical_filename, 'rb'))) |
| 108 | | self.failUnlessEqual(len(nodes1), len(nodes2)) |
| 109 | | for node1, node2 in zip(nodes1, nodes2): |
| 110 | | self._compare(node1, node2) |
| 111 | | except: |
| 112 | | print |
| 113 | | print "DATA1:" |
| 114 | | print file(data_filename, 'rb').read() |
| 115 | | print "DATA2:" |
| 116 | | print file(canonical_filename, 'rb').read() |
| 117 | | print "NODES1:", nodes1 |
| 118 | | print "NODES2:", nodes2 |
| 119 | | raise |
| | 132 | test_composer.unittest = ['.data', '.canonical'] |
| 121 | | def _compare(self, node1, node2): |
| 122 | | self.failUnlessEqual(node1.__class__, node2.__class__) |
| 123 | | if isinstance(node1, ScalarNode): |
| 124 | | #self.failUnlessEqual(node1.tag, node2.tag) |
| 125 | | self.failUnlessEqual(node1.value, node2.value) |
| 126 | | elif isinstance(node1, SequenceNode): |
| 127 | | self.failUnlessEqual(len(node1.value), len(node2.value)) |
| 128 | | for item1, item2 in zip(node1.value, node2.value): |
| 129 | | self._compare(item1, item2) |
| 130 | | elif isinstance(node1, MappingNode): |
| 131 | | self.failUnlessEqual(len(node1.value), len(node2.value)) |
| 132 | | items1 = node1.value.items() |
| 133 | | items1.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value), |
| 134 | | (k2.tag,k2.value,v2.tag,v2.value))) |
| 135 | | items2 = node2.value.items() |
| 136 | | items2.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value), |
| 137 | | (k2.tag,k2.value,v2.tag,v2.value))) |
| 138 | | for (key1, value1), (key2, value2) in zip(items1, items2): |
| 139 | | self._compare(key1, key2) |
| 140 | | self._compare(value1, value2) |
| | 134 | def _make_loader(): |
| | 135 | global MyLoader |
| 142 | | TestResolver.add_tests('testResolver', '.data', '.canonical') |
| | 137 | class MyLoader(yaml.Loader): |
| | 138 | def construct_sequence(self, node): |
| | 139 | return tuple(yaml.Loader.construct_sequence(self, node)) |
| | 140 | def construct_mapping(self, node): |
| | 141 | pairs = self.construct_pairs(node) |
| | 142 | pairs.sort() |
| | 143 | return pairs |
| | 144 | def construct_undefined(self, node): |
| | 145 | return self.construct_scalar(node) |
| 153 | | def construct_undefined(self, node): |
| 154 | | return self.construct_scalar(node) |
| | 153 | class MyCanonicalLoader(yaml.CanonicalLoader): |
| | 154 | def construct_sequence(self, node): |
| | 155 | return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) |
| | 156 | def construct_mapping(self, node): |
| | 157 | pairs = self.construct_pairs(node) |
| | 158 | pairs.sort() |
| | 159 | return pairs |
| | 160 | def construct_undefined(self, node): |
| | 161 | return self.construct_scalar(node) |
| 169 | | def construct_undefined(self, node): |
| 170 | | return self.construct_scalar(node) |
| 171 | | |
| 172 | | MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) |
| 173 | | MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) |
| 174 | | |
| 175 | | class TestConstructor(test_appliance.TestAppliance): |
| 176 | | |
| 177 | | def _testConstructor(self, test_name, data_filename, canonical_filename): |
| 178 | | data1 = None |
| 179 | | data2 = None |
| 180 | | try: |
| 181 | | data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader)) |
| 182 | | data2 = list(load_all(file(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) |
| 183 | | self.failUnlessEqual(data1, data2) |
| 184 | | except: |
| 185 | | print |
| 186 | | print "DATA1:" |
| 187 | | print file(data_filename, 'rb').read() |
| 188 | | print "DATA2:" |
| 189 | | print file(canonical_filename, 'rb').read() |
| 190 | | print "NATIVES1:", data1 |
| 191 | | print "NATIVES2:", data2 |
| 192 | | raise |
| 193 | | |
| 194 | | TestConstructor.add_tests('testConstructor', '.data', '.canonical') |
| 195 | | |
| 196 | | class TestParserOnCanonical(test_appliance.TestAppliance): |
| 197 | | |
| 198 | | def _testParserOnCanonical(self, test_name, canonical_filename): |
| 199 | | events1 = None |
| 200 | | events2 = None |
| 201 | | try: |
| 202 | | events1 = list(parse(file(canonical_filename, 'rb'))) |
| 203 | | events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb'))) |
| 204 | | self._compare(events1, events2) |
| 205 | | except: |
| 206 | | print |
| 207 | | print "DATA:" |
| 208 | | print file(canonical_filename, 'rb').read() |
| 209 | | print "EVENTS1:", events1 |
| 210 | | print "EVENTS2:", events2 |
| 211 | | raise |
| 212 | | |
| 213 | | def _compare(self, events1, events2): |
| 214 | | self.failUnlessEqual(len(events1), len(events2)) |
| 215 | | for event1, event2 in zip(events1, events2): |
| 216 | | self.failUnlessEqual(event1.__class__, event2.__class__) |
| 217 | | if isinstance(event1, AliasEvent): |
| 218 | | self.failUnlessEqual(event1.anchor, event2.anchor) |
| 219 | | elif isinstance(event1, ScalarEvent): |
| 220 | | self.failUnlessEqual(event1.anchor, event2.anchor) |
| 221 | | self.failUnlessEqual(event1.tag, event2.tag) |
| 222 | | self.failUnlessEqual(event1.value, event2.value) |
| 223 | | if isinstance(event1, CollectionStartEvent): |
| 224 | | self.failUnlessEqual(event1.anchor, event2.anchor) |
| 225 | | self.failUnlessEqual(event1.tag, event2.tag) |
| 226 | | |
| 227 | | TestParserOnCanonical.add_tests('testParserOnCanonical', '.canonical') |
| 228 | | |