| 7 | | class TestEmitter(test_appliance.TestAppliance): |
| | 4 | def _compare_events(events1, events2): |
| | 5 | assert len(events1) == len(events2), (events1, events2) |
| | 6 | for event1, event2 in zip(events1, events2): |
| | 7 | assert event1.__class__ == event2.__class__, (event1, event2) |
| | 8 | if isinstance(event1, yaml.NodeEvent): |
| | 9 | assert event1.anchor == event2.anchor, (event1, event2) |
| | 10 | if isinstance(event1, yaml.CollectionStartEvent): |
| | 11 | assert event1.tag == event2.tag, (event1, event2) |
| | 12 | if isinstance(event1, yaml.ScalarEvent): |
| | 13 | if True not in event1.implicit+event2.implicit: |
| | 14 | assert event1.tag == event2.tag, (event1, event2) |
| | 15 | assert event1.value == event2.value, (event1, event2) |
| 18 | | def _testEmitter(self, test_name, filename, canonical=None): |
| 19 | | events = list(parse(file(filename, 'rb'))) |
| 20 | | #self._dump(filename, events, canonical) |
| 21 | | stream = StringIO.StringIO() |
| 22 | | emit(events, stream, canonical=canonical) |
| 23 | | data = stream.getvalue() |
| 24 | | new_events = list(parse(data)) |
| 25 | | for event, new_event in zip(events, new_events): |
| 26 | | self.failUnlessEqual(event.__class__, new_event.__class__) |
| 27 | | if isinstance(event, NodeEvent): |
| 28 | | self.failUnlessEqual(event.anchor, new_event.anchor) |
| 29 | | if isinstance(event, CollectionStartEvent): |
| 30 | | self.failUnlessEqual(event.tag, new_event.tag) |
| 31 | | if isinstance(event, ScalarEvent): |
| 32 | | #self.failUnlessEqual(event.implicit, new_event.implicit) |
| 33 | | if True not in event.implicit+new_event.implicit: |
| 34 | | self.failUnlessEqual(event.tag, new_event.tag) |
| 35 | | self.failUnlessEqual(event.value, new_event.value) |
| | 38 | test_emitter_on_canonical.unittest = ['.canonical'] |
| 37 | | def _testEmitterStyles(self, test_name, canonical_filename, data_filename): |
| 38 | | for filename in [canonical_filename, data_filename]: |
| 39 | | events = list(parse(file(filename, 'rb'))) |
| 40 | | for flow_style in [False, True]: |
| 41 | | for style in ['|', '>', '"', '\'', '']: |
| 42 | | styled_events = [] |
| 43 | | for event in events: |
| 44 | | if isinstance(event, ScalarEvent): |
| 45 | | event = ScalarEvent(event.anchor, event.tag, |
| 46 | | event.implicit, event.value, style=style) |
| 47 | | elif isinstance(event, SequenceStartEvent): |
| 48 | | event = SequenceStartEvent(event.anchor, event.tag, |
| 49 | | event.implicit, flow_style=flow_style) |
| 50 | | elif isinstance(event, MappingStartEvent): |
| 51 | | event = MappingStartEvent(event.anchor, event.tag, |
| 52 | | event.implicit, flow_style=flow_style) |
| 53 | | styled_events.append(event) |
| 54 | | stream = StringIO.StringIO() |
| 55 | | emit(styled_events, stream) |
| 56 | | data = stream.getvalue() |
| 57 | | #print data |
| 58 | | new_events = list(parse(data)) |
| 59 | | for event, new_event in zip(events, new_events): |
| 60 | | self.failUnlessEqual(event.__class__, new_event.__class__) |
| 61 | | if isinstance(event, NodeEvent): |
| 62 | | self.failUnlessEqual(event.anchor, new_event.anchor) |
| 63 | | if isinstance(event, CollectionStartEvent): |
| 64 | | self.failUnlessEqual(event.tag, new_event.tag) |
| 65 | | if isinstance(event, ScalarEvent): |
| 66 | | #self.failUnlessEqual(event.implicit, new_event.implicit) |
| 67 | | if True not in event.implicit+new_event.implicit: |
| 68 | | self.failUnlessEqual(event.tag, new_event.tag) |
| 69 | | self.failUnlessEqual(event.value, new_event.value) |
| | 40 | def test_emitter_styles(data_filename, canonical_filename, verbose=False): |
| | 41 | for filename in [data_filename, canonical_filename]: |
| | 42 | events = list(yaml.parse(open(filename, 'rb'))) |
| | 43 | for flow_style in [False, True]: |
| | 44 | for style in ['|', '>', '"', '\'', '']: |
| | 45 | styled_events = [] |
| | 46 | for event in events: |
| | 47 | if isinstance(event, yaml.ScalarEvent): |
| | 48 | event = yaml.ScalarEvent(event.anchor, event.tag, |
| | 49 | event.implicit, event.value, style=style) |
| | 50 | elif isinstance(event, yaml.SequenceStartEvent): |
| | 51 | event = yaml.SequenceStartEvent(event.anchor, event.tag, |
| | 52 | event.implicit, flow_style=flow_style) |
| | 53 | elif isinstance(event, yaml.MappingStartEvent): |
| | 54 | event = yaml.MappingStartEvent(event.anchor, event.tag, |
| | 55 | event.implicit, flow_style=flow_style) |
| | 56 | styled_events.append(event) |
| | 57 | output = yaml.emit(styled_events) |
| | 58 | if verbose: |
| | 59 | print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style) |
| | 60 | print output |
| | 61 | new_events = list(yaml.parse(output)) |
| | 62 | _compare_events(events, new_events) |
| 72 | | def _dump(self, filename, events, canonical): |
| 73 | | print "="*30 |
| 74 | | print "ORIGINAL DOCUMENT:" |
| 75 | | print file(filename, 'rb').read() |
| 76 | | print '-'*30 |
| 77 | | print "EMITTED DOCUMENT:" |
| 78 | | emit(events, sys.stdout, canonical=canonical) |
| 79 | | |
| 80 | | TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data') |
| 81 | | TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical') |
| 82 | | TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical') |
| 83 | | TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data') |
| 84 | | |
| 85 | | class EventsLoader(Loader): |
| | 66 | class EventsLoader(yaml.Loader): |
| 109 | | def _testEmitterEvents(self, test_name, events_filename): |
| 110 | | events = list(load(file(events_filename, 'rb'), Loader=EventsLoader)) |
| 111 | | #self._dump(events_filename, events) |
| 112 | | stream = StringIO.StringIO() |
| 113 | | emit(events, stream) |
| 114 | | data = stream.getvalue() |
| 115 | | new_events = list(parse(data)) |
| 116 | | self.failUnlessEqual(len(events), len(new_events)) |
| 117 | | for event, new_event in zip(events, new_events): |
| 118 | | self.failUnlessEqual(event.__class__, new_event.__class__) |
| 119 | | if isinstance(event, NodeEvent): |
| 120 | | self.failUnlessEqual(event.anchor, new_event.anchor) |
| 121 | | if isinstance(event, CollectionStartEvent): |
| 122 | | self.failUnlessEqual(event.tag, new_event.tag) |
| 123 | | if isinstance(event, ScalarEvent): |
| 124 | | self.failUnless(event.implicit == new_event.implicit |
| 125 | | or event.tag == new_event.tag) |
| 126 | | self.failUnlessEqual(event.value, new_event.value) |
| | 97 | if __name__ == '__main__': |
| | 98 | import test_appliance |
| | 99 | test_appliance.run(globals()) |