From d0e440a304b341a3376ef4a742dc73afd4c0e00b Mon Sep 17 00:00:00 2001 From: Robin Dunn Date: Sat, 20 Apr 2013 21:53:50 +0000 Subject: [PATCH] Add pubsub unittests, adapted by Werner git-svn-id: https://svn.wxwidgets.org/svn/wx/wxPython/Phoenix/trunk@73830 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775 --- .../lib_pubsub_except_raisinglistener.py | 7 + unittests/lib_pubsub_provider_actual.py | 36 ++ unittests/lib_pubsub_provider_expect.py | 36 ++ .../lib_pubsub_provider_my_import_topics.py | 14 + unittests/test_lib_pubsub_api3.py | 313 +++++++++++++ unittests/test_lib_pubsub_defaultlog.py | 38 ++ unittests/test_lib_pubsub_except.py | 148 ++++++ unittests/test_lib_pubsub_listener.py | 349 ++++++++++++++ unittests/test_lib_pubsub_notify.py | 161 +++++++ unittests/test_lib_pubsub_notify2.py | 116 +++++ unittests/test_lib_pubsub_provider.py | 260 +++++++++++ unittests/test_lib_pubsub_spec.py | 132 ++++++ unittests/test_lib_pubsub_topic.py | 168 +++++++ unittests/test_lib_pubsub_topicmgr.py | 426 ++++++++++++++++++ 14 files changed, 2204 insertions(+) create mode 100644 unittests/lib_pubsub_except_raisinglistener.py create mode 100644 unittests/lib_pubsub_provider_actual.py create mode 100644 unittests/lib_pubsub_provider_expect.py create mode 100644 unittests/lib_pubsub_provider_my_import_topics.py create mode 100644 unittests/test_lib_pubsub_api3.py create mode 100644 unittests/test_lib_pubsub_defaultlog.py create mode 100644 unittests/test_lib_pubsub_except.py create mode 100644 unittests/test_lib_pubsub_listener.py create mode 100644 unittests/test_lib_pubsub_notify.py create mode 100644 unittests/test_lib_pubsub_notify2.py create mode 100644 unittests/test_lib_pubsub_provider.py create mode 100644 unittests/test_lib_pubsub_spec.py create mode 100644 unittests/test_lib_pubsub_topic.py create mode 100644 unittests/test_lib_pubsub_topicmgr.py diff --git a/unittests/lib_pubsub_except_raisinglistener.py b/unittests/lib_pubsub_except_raisinglistener.py new file mode 100644 index 00000000..9af8f3be --- /dev/null +++ b/unittests/lib_pubsub_except_raisinglistener.py @@ -0,0 +1,7 @@ +def getRaisingListener(): + def raisingListener(): + def nested(): + raise RuntimeError2('test') + nested() + + return raisingListener \ No newline at end of file diff --git a/unittests/lib_pubsub_provider_actual.py b/unittests/lib_pubsub_provider_actual.py new file mode 100644 index 00000000..b8ccadd1 --- /dev/null +++ b/unittests/lib_pubsub_provider_actual.py @@ -0,0 +1,36 @@ +# Automatically generated by TopicTreeAsSpec(**kwargs). +# The kwargs were: +# - fileObj: file +# - width: 70 +# - treeDoc: 'Tree docs, can be anything you want....' +# - indentStep: 4 +# - footer: '# End of topic tree definition. Note that application may l...' + + +''' +Tree docs, can be anything you want. +''' + + +class test_import_export_no_change2: + ''' + Root topic 1. + ''' + + class subtopic_1: + ''' + Sub topic 1 of root topic. Docs rely on one blank line for + topic doc, and indentation for each argument doc. + ''' + + def msgDataSpec(arg1, arg2=None): + ''' + - arg1: some multiline doc + for arg1 + - arg2: some multiline doc + for arg2 + ''' + + +# End of topic tree definition. Note that application may load +# more than one definitions provider. diff --git a/unittests/lib_pubsub_provider_expect.py b/unittests/lib_pubsub_provider_expect.py new file mode 100644 index 00000000..b8ccadd1 --- /dev/null +++ b/unittests/lib_pubsub_provider_expect.py @@ -0,0 +1,36 @@ +# Automatically generated by TopicTreeAsSpec(**kwargs). +# The kwargs were: +# - fileObj: file +# - width: 70 +# - treeDoc: 'Tree docs, can be anything you want....' +# - indentStep: 4 +# - footer: '# End of topic tree definition. Note that application may l...' + + +''' +Tree docs, can be anything you want. +''' + + +class test_import_export_no_change2: + ''' + Root topic 1. + ''' + + class subtopic_1: + ''' + Sub topic 1 of root topic. Docs rely on one blank line for + topic doc, and indentation for each argument doc. + ''' + + def msgDataSpec(arg1, arg2=None): + ''' + - arg1: some multiline doc + for arg1 + - arg2: some multiline doc + for arg2 + ''' + + +# End of topic tree definition. Note that application may load +# more than one definitions provider. diff --git a/unittests/lib_pubsub_provider_my_import_topics.py b/unittests/lib_pubsub_provider_my_import_topics.py new file mode 100644 index 00000000..d7dff4ec --- /dev/null +++ b/unittests/lib_pubsub_provider_my_import_topics.py @@ -0,0 +1,14 @@ +class root_topic1: + 'docs for root_topic1' + + def msgDataSpec(): + pass + + class sub_topic11: + 'docs for sub_topic11' + +class root_topic2: + 'docs for root_topic2' + + class sub_topic21: + 'docs for sub_topic21' diff --git a/unittests/test_lib_pubsub_api3.py b/unittests/test_lib_pubsub_api3.py new file mode 100644 index 00000000..84ea2683 --- /dev/null +++ b/unittests/test_lib_pubsub_api3.py @@ -0,0 +1,313 @@ +""" +Except for one test, this file tests with auto-creation of topics +disabled, as it is more rigorous for testing purposes. + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from wx.lib.pubsub import pub +from wx.lib.pubsub.utils.notification import IgnoreNotificationsMixin + + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Except(wtc.WidgetTestCase): + + + def testDOAListenerPubsub(self): + # Verify that a 'temporary' listener (one that will be garbage collected + # as soon as subscribe() returns because there are no strong references to + # it) gets immediately unregistered + + def listener(): + pass + class Wrapper: + def __init__(self, func): + self.func = func + def __call__(self): + pass + + pub.subscribe( Wrapper(listener), 'testDOAListenerPubsub') + assert not pub.getTopic('testDOAListenerPubsub').hasListeners() + assert pub.isValid(listener, 'testDOAListenerPubsub') + + + def testDeadListener(self): + # create a listener for listeners that have died + class DeathListener(IgnoreNotificationsMixin): + listenerStr = '' + def notifyDeadListener(self, pubListener, topicObj): + self.assertEqual(topicObj.getName(), 'sadTopic') + DeathListener.listenerStr = pubListener.name() + dl = DeathListener() + dl.assertEqual = self.assertEqual + pub.addNotificationHandler(dl) + pub.setNotificationFlags(deadListener=True) + + # define a topic, subscribe to it, and kill its listener: + class TempListener: + def __call__(self, **kwargs): + pass + def __del__(self): + pass #print 'being deleted' + tempListener = TempListener() + expectLisrStr, _ = pub.getListenerID(tempListener) + pub.subscribe(tempListener, 'sadTopic') + del tempListener + + # verify: + assert DeathListener.listenerStr.startswith(expectLisrStr), \ + '"%s" !~ "%s"' % (DeathListener.listenerStr, expectLisrStr) + + pub.addNotificationHandler(None) + pub.clearNotificationHandlers() + + + def testSubscribe(self): + topicName = 'testSubscribe' + def proto(a, b, c=None): + pass + pub.getOrCreateTopic(topicName, proto) + + def listener(a, b, c=None): pass + # verify that pub.isValid() works too + pub.validate(listener, topicName) + assert pub.isValid(listener, topicName) + + self.assertEqual(pub.getTopic(topicName).getNumListeners(), 0) + self.assertEqual(pub.getAssociatedTopics(listener), []) + assert not pub.isSubscribed(listener, topicName) + assert pub.subscribe(listener, topicName) + assert pub.isSubscribed(listener, topicName) + def topicNames(listener): + return [t.getName() for t in pub.getAssociatedTopics(listener)] + self.assertEqual(topicNames(listener), [topicName]) + # should do nothing if already subscribed: + assert not pub.subscribe(listener, topicName)[1] + self.assertEqual(pub.getTopic(topicName).getNumListeners(), 1) + + # test pub.getAssociatedTopics() + pub.subscribe(listener, 'lt2', ) + self.assertEqual(set(topicNames(listener)), + set([topicName,'lt2'])) + pub.subscribe(listener, 'lt1.lst1') + self.assertEqual(set(topicNames(listener)), + set([topicName,'lt2','lt1.lst1'])) + + # test ALL_TOPICS + def listenToAll(): + pass + pub.subscribe(listenToAll, pub.ALL_TOPICS) + self.assertEqual(topicNames(listenToAll), [pub.ALL_TOPICS]) + + + def testMissingReqdArgs(self): + def proto(a, b, c=None): + pass + pub.getOrCreateTopic('missingReqdArgs', proto) + self.assertRaises(pub.SenderMissingReqdArgs, pub.sendMessage, + 'missingReqdArgs', a=1) + + + def testSendTopicWithMessage(self): + class MyListener: + def __init__(self): + self.count = 0 + self.heardTopic = False + self.listen2Topics = [] + def listen0(self): + pass + def listen1(self, **kwarg): + self.count += 1 + self.heardTopic = True + def listen2(self, msgTopic=pub.AUTO_TOPIC, **kwarg): + self.listen2Topics.append(msgTopic.getName()) + + my = MyListener() + pub.subscribe(my.listen0, 'testSendTopic') + pub.subscribe(my.listen1, 'testSendTopic') + pub.subscribe(my.listen2, 'testSendTopic') + + pub.sendMessage('testSendTopic') + self.assertEqual(my.count, 1) + self.assertEqual(my.heardTopic, True) + + pub.subscribe(my.listen0, 'testSendTopic.subtopic') + pub.subscribe(my.listen1, 'testSendTopic.subtopic') + pub.subscribe(my.listen2, 'testSendTopic.subtopic') + + pub.sendMessage('testSendTopic.subtopic') + self.assertEqual(my.count, 3) + self.assertEqual([], [topic for topic in my.listen2Topics + if topic not in ('testSendTopic', 'testSendTopic.subtopic')] ) + + + def testAcceptAllArgs(self): + def listen(arg1=None): + pass + def listenAllArgs(arg1=None, **kwargs): + pass + def listenAllArgs2(arg1=None, msgTopic=pub.AUTO_TOPIC, **kwargs): + pass + + pub.subscribe(listen, 'testAcceptAllArgs') + + pub.subscribe(listenAllArgs, 'testAcceptAllArgs') + pub.subscribe(listenAllArgs2, 'testAcceptAllArgs') + + pub.subscribe(listenAllArgs2, 'testAcceptAllArgs.subtopic') + pub.subscribe(listenAllArgs, 'testAcceptAllArgs.subtopic') + + + def testUnsubAll(self): + def lisnr1(): + pass + def lisnr2(): + pass + class MyListener: + def __call__(self): + pass + def meth(self): + pass + def __hash__(self): + return 123 + lisnr3 = MyListener() + lisnr4 = lisnr3.meth + def lisnrSub(listener=None, topic=None, newSub=None): pass + pub.subscribe(lisnrSub, 'pubsub.subscribe') + self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1) + + def subAll(): + pub.subscribe(lisnr1, 'testUnsubAll') + pub.subscribe(lisnr2, 'testUnsubAll') + pub.subscribe(lisnr3, 'testUnsubAll') + pub.subscribe(lisnr4, 'testUnsubAll') + self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 4) + + def filter(lisnr): + passes = str(lisnr).endswith('meth') + return passes + + # test unsub many non-pubsub topic listeners + subAll() + pub.unsubAll('testUnsubAll') + self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 0) + self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1) + # now same but with filter: + subAll() + unsubed = pub.unsubAll('testUnsubAll', listenerFilter=filter) + self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 3) + self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1) + + # test unsub all listeners of all topics + subAll() + self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 4) + unsubed = pub.unsubAll(listenerFilter=filter) + self.assertEqual(unsubed, [lisnr4]) + self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 3) + self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1) + unsubed = set( pub.unsubAll() ) + expect = set([lisnr1, lisnrSub, lisnr3, lisnr2]) + # at least all the 'expected' ones were unsub'd; will be others if this + # test is run after other unit tests in same nosetests run + assert unsubed >= expect + + + def testSendForUndefinedTopic(self): + pub.sendMessage('testSendForUndefinedTopic') + assert pub.getTopic('testSendForUndefinedTopic') + self.assertEqual(pub.getTopic('testSendForUndefinedTopic').getArgs(), + (None, None)) + + # must also check for subtopics if parents have listeners since + # filtering of args is affected + def listener(): + pass + pub.subscribe(listener, 'testSendForUndefinedTopic') + pub.sendMessage('testSendForUndefinedTopic.subtopic', msg='something') + + def testTopicUnspecifiedError(self): + self.assertRaises(pub.ListenerSpecIncomplete, pub.setTopicUnspecifiedFatal) + pub.setTopicUnspecifiedFatal(checkExisting=False) + def fn(): + pass + LSI = pub.ListenerSpecIncomplete + self.assertRaises(LSI, pub.sendMessage, 'testTopicUnspecifiedError') + self.assertRaises(LSI, pub.subscribe, fn, 'testTopicUnspecifiedError') + pub.setTopicUnspecifiedFatal(False) + pub.sendMessage('testTopicUnspecifiedError') + pub.subscribe(fn, 'testTopicUnspecifiedError') + + + def testArgSpecDerivation(self): + def ok_0(): + pass + + def ok_1(arg1): + pass + def err_11(arg1=None): + pass # required can't become optional! + def err_12(arg2): + pass # parent's arg1 missing + + def ok_2(arg1=None): + pass + def ok_21(arg1): + pass # optional can become required + def err_22(arg2): + pass # parent's arg1 missing + + # with getOrCreateTopic(topic, proto), the 'required args' set + # is garanteed to be a subset of 'all args' + pub.getOrCreateTopic('tasd', ok_0) + pub.getOrCreateTopic('tasd.t_1', ok_1) + self.assertRaises(pub.ListenerSpecInvalid, pub.getOrCreateTopic, + 'tasd.t_1.t_11', err_11) + self.assertRaises(pub.ListenerSpecInvalid, pub.getOrCreateTopic, + 'tasd.t_1.t_12', err_12) + pub.getOrCreateTopic('tasd.t_2', ok_2) + pub.getOrCreateTopic('tasd.t_2.t_21', ok_21) + self.assertRaises(pub.ListenerSpecInvalid, pub.getOrCreateTopic, + 'tasd.t_2.t_22', err_22) + + # with newTopic(), 'required args' specified separately so + # verify that errors caught + def check(subName, required=(), **args): + tName = 'tasd.'+subName + try: + pub.newTopic(tName, 'desc', required, **args) + msg = 'Should have raised pub.ListenerSpecInvalid for %s, %s, %s' + assert False, msg % (tName, required, args) + except pub.ListenerSpecInvalid, exc: + #import traceback + #traceback.print_exc() + print 'As expected: ', exc + + pub.newTopic('tasd.t_1.t_13', 'desc', ('arg1',), arg1='docs for arg1') # ok_1 + check('t_1.t_14', arg1='docs for arg1') # err_11 + check('t_1.t_15', ('arg2',), arg2='docs for arg2') # err_12 + + pub.newTopic('tasd.t_2.t_23', 'desc', ('arg1',), arg1='docs for arg1') # ok_21 + check('t_2.t_24', ('arg2',), arg2='docs for arg2') # err_22 + + # check when no inheritence involved + # reqd args wrong + check('t_1.t_16', ('arg1',), arg2='docs for arg2') + check('t_1.t_17', ('arg2',), arg1='docs for arg1') + check('t_3', ('arg1',), arg2='docs for arg2') + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() + + \ No newline at end of file diff --git a/unittests/test_lib_pubsub_defaultlog.py b/unittests/test_lib_pubsub_defaultlog.py new file mode 100644 index 00000000..b1a8e956 --- /dev/null +++ b/unittests/test_lib_pubsub_defaultlog.py @@ -0,0 +1,38 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from wx.lib.pubsub import pub +from wx.lib.pubsub.utils import notification +from StringIO import StringIO + + +#--------------------------------------------------------------------------- + + +class lib_pubsub_DefaultLog(wtc.WidgetTestCase): + + + def testNotifications(self): + capture = StringIO() + logger = notification.useNotifyByWriteFile(capture) + def block(): + def listener(): pass + pub.subscribe(listener, 'testNotifications') + block() + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() + + \ No newline at end of file diff --git a/unittests/test_lib_pubsub_except.py b/unittests/test_lib_pubsub_except.py new file mode 100644 index 00000000..22363dfb --- /dev/null +++ b/unittests/test_lib_pubsub_except.py @@ -0,0 +1,148 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from wx.lib.pubsub import pub + + +def throws(): + raise RuntimeError('test') + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Except(wtc.WidgetTestCase): + + + def testHandleExcept1a(self): + from wx.lib.pubsub.utils.exchandling import ExcPublisher + excPublisher = ExcPublisher( pub.getDefaultTopicMgr() ) + pub.setListenerExcHandler(excPublisher) + + # create a listener that raises an exception: + from lib_pubsub_except_raisinglistener import getRaisingListener + raisingListener = getRaisingListener() + + pub.setNotificationFlags(all=False) + pub.subscribe(raisingListener, 'testHandleExcept1a') + + # first test when a listener raises an exception and exception listener also raises! + class BadUncaughtExcListener: + def __call__(self, listenerStr=None, excTraceback=None): + raise RuntimeError('bad exception listener!') + handler = BadUncaughtExcListener() + pub.subscribe(handler, ExcPublisher.topicUncaughtExc) + self.assertRaises(pub.ExcHandlerError, pub.sendMessage, + 'testHandleExcept1a') + + def testHandleExcept1b(self): + # create a listener that raises an exception: + from lib_pubsub_except_raisinglistener import getRaisingListener + raisingListener = getRaisingListener() + pub.subscribe(raisingListener, 'testHandleExcept1b') + + # subscribe a good exception listener and validate + # create the listener for uncaught exceptions in listeners: + class UncaughtExcListener: + def __call__(self, listenerStr=None, excTraceback=None): + # verify that information received; first the listenerStr + assert listenerStr.startswith('raisingListener') + # next the traceback: + tb = excTraceback.traceback + self.assertEqual(len(tb), 2) + def validateTB(tbItem, eFN, eLine, eFnN, assertE=self.assertEqual): + assert tbItem[0].endswith(eFN), '%s !~ %s' % (tbItem[0], eFN) + assertE(tbItem[1], eLine) + assertE(tbItem[2], eFnN) + validateTB(tb[0], 'lib_pubsub_except_raisinglistener.py', 5, 'raisingListener') + validateTB(tb[1], 'lib_pubsub_except_raisinglistener.py', 4, 'nested') + # next the formatted traceback: + self.assertEqual(len(excTraceback.getFormattedList() ), len(tb)+1) + # finally the string for formatted traceback: + msg = excTraceback.getFormattedString() + #print 'Msg "%s"' % msg + assert msg.startswith(' File') + assert msg.endswith("global name 'RuntimeError2' is not defined\n") + + from wx.lib.pubsub.utils.exchandling import ExcPublisher + topic = pub.getTopic(ExcPublisher.topicUncaughtExc) + assert not topic.hasListeners() + handler = UncaughtExcListener() + handler.assertEqual = self.assertEqual + pub.subscribe(handler, ExcPublisher.topicUncaughtExc) + pub.sendMessage('testHandleExcept1b') + + # verify that listener isn't stuck in a cyclic reference by sys.exc_info() + del raisingListener + assert not pub.getTopic('testHandleExcept1b').hasListeners() + + + def testHandleExcept2(self): + #Test sendMessage when one handler, then change handler and verify changed + testTopic = 'testTopics.testHandleExcept2' + pub.subscribe(throws, testTopic) + pub.setListenerExcHandler(None) + #pubsub.utils.notification.useNotifyByWriteFile() + #assert_equal( pub.getTopic(testTopic).getNumListeners(), 1 ) + + expect = None + + def validate(className): + global expect + assert expect == className + expect = None + + class MyExcHandler: + def __call__(self, listener, topicObj): + validate(self.__class__.__name__) + + class MyExcHandler2: + def __call__(self, listener, topicObj): + validate(self.__class__.__name__) + + def doHandling(HandlerClass): + global expect + expect = HandlerClass.__name__ #'MyExcHandler' + excHandler = HandlerClass() + pub.setListenerExcHandler(excHandler) + pub.sendMessage(testTopic) + assert expect is None + + doHandling(MyExcHandler) + doHandling(MyExcHandler2) + + # restore to no handling and verify: + pub.setListenerExcHandler(None) + self.assertRaises(RuntimeError, pub.sendMessage, testTopic) + + + def testNoExceptionHandling1(self): + pub.setListenerExcHandler(None) + + def raises(): + raise RuntimeError('test') + pub.getOrCreateTopic('testNoExceptionTrapping') + pub.subscribe(raises, 'testNoExceptionTrapping') + self.assertRaises(RuntimeError, pub.sendMessage, 'testNoExceptionTrapping') + + + def testNoExceptionHandling2(self): + testTopic = 'testTopics.testNoExceptionHandling' + pub.subscribe(throws, testTopic) + assert pub.getListenerExcHandler() is None + self.assertRaises(RuntimeError, pub.sendMessage, testTopic) + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() + diff --git a/unittests/test_lib_pubsub_listener.py b/unittests/test_lib_pubsub_listener.py new file mode 100644 index 00000000..a6b44a8a --- /dev/null +++ b/unittests/test_lib_pubsub_listener.py @@ -0,0 +1,349 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + +""" + +import imp_unittest, unittest +import wtc + +from wx.lib.pubsub.core.weakmethod import WeakMethod +from wx.lib.pubsub.core import listener +from wx.lib.pubsub.core.listener import \ + Listener, ListenerValidator, \ + ListenerInadequate, \ + CallArgsInfo, \ + getArgs + +#--------------------------------------------------------------------------- + +class ArgsInfoMock: + def __init__(self, autoTopicArgName=None): + self.autoTopicArgName = autoTopicArgName + self.acceptsAllKwargs = False + +#--------------------------------------------------------------------------- + + +class lib_pubsub_ArgsInfo(wtc.WidgetTestCase): + + def test0_ArgsInfo(self): + def listener0(msgTopic = Listener.AUTO_TOPIC): + pass + CallArgsInfo(listener0, 0) + + def listener1(arg1, msgTopic = Listener.AUTO_TOPIC): + pass + CallArgsInfo(listener1, 1) + + + def test2_Validation0(self): + # Test when ValidatorSameKwargsOnly used, ie when args in + # listener and topic must be exact match (unless *arg). + AA = Listener.AUTO_TOPIC + + # test for topic that has no arg/kwargs in topic message spec (TMS) + def same(): + pass + def varargs(*args, **kwargs): + pass + def autoArg(msgTopic=AA): + pass + def extraArg(a): + pass + def extraKwarg(a=1): + pass + + # no arg/kwarg in topic message spec (TMS) + validator = ListenerValidator([], []) + validate = validator.validate + + validate(same) # ok: same + validate(varargs) # ok: *args/**kwargs + validate(autoArg) # ok: extra but AUTO_TOPIC + self.assertRaises(ListenerInadequate, validate, extraArg) # E: extra arg + self.assertRaises(ListenerInadequate, validate, extraKwarg) # E: extra kwarg + + def test2_Validation1(self): + # one arg/kwarg in topic + validator = ListenerValidator(['a'], ['b']) + validate = validator.validate + + def same(a, b=1): + pass + def same2(a=2, b=1): + pass + def varkwargs(**kwargs): + pass + def varkwargs_a(a, **kwargs): + pass + + def opt_reqd(b, **kwargs): + pass + def missing_arg(b=1): + pass + def missing_kwarg(a): + pass + def extra_kwarg1(a,b=1,c=2): + pass + def extra_kwarg2(c=1, *args, **kwargs): + pass + def extra_arg1(a,c,b=1): + pass + def extra_arg2(a,b,c=2): + pass + + validate(same) # ok: same + validate(same2) # ok: same even if a now has default value + validate(varkwargs_a) # ok: has **kwargs + validate(varkwargs) # ok: has **kwargs + + self.assertRaises(ListenerInadequate, validate, opt_reqd) # E: b now required + self.assertRaises(ListenerInadequate, validate, missing_arg) # E: missing arg + self.assertRaises(ListenerInadequate, validate, missing_kwarg) # E: missing kwarg + self.assertRaises(ListenerInadequate, validate, extra_kwarg1) # E: extra kwarg + self.assertRaises( ListenerInadequate, validate, extra_kwarg2) # E: extra kwarg + self.assertRaises( ListenerInadequate, validate, extra_arg1) # E: extra arg + self.assertRaises( ListenerInadequate, validate, extra_arg2) # E: extra arg + + + def test3_IsCallable(self): + # Test the proper trapping of non-callable and certain types of + # callable objects. + + # validate different types of callables + validator = ListenerValidator([], []) + # not a function: + notAFunc = 1 # just pick something that is not a function + self.assertRaises(ListenerInadequate, validator.validate, notAFunc) + # a regular function: + def aFunc(): + pass + validator.validate(aFunc) + + # a functor and a method + class Foo(object): + def __call__(self): + pass + def meth(self): + pass + foo = Foo() + validator.validate(foo) + validator.validate(foo.meth) + + + def test4_WantTopic(self): + # Test the correct determination of whether want topic + # auto-passed during sendMessage() calls. + + # first check proper breakdown of listener args: + def listener(a, b=1): + pass + argsInfo = CallArgsInfo(listener, 0) + self.assertEqual(None, argsInfo.autoTopicArgName ) + + msgTopic = 'auto' + class MyListener: + def method(self, a, b=1, auto=Listener.AUTO_TOPIC): + pass + listener = MyListener() + argsInfo = getArgs(listener.method) + self.assertEqual(msgTopic, argsInfo.autoTopicArgName ) + self.assertEqual(['a','b'], argsInfo.allParams ) + + # now some white box testing of validator that makes use of args info: + def checkWantTopic(validate, listener, wantTopicAsArg=None): + argsInfo = getArgs(listener) + self.assertEqual(argsInfo.autoTopicArgName, wantTopicAsArg) + validate(listener) + + validator = ListenerValidator([], ['a']) + validate = validator.validate + def noWant(a=1): + pass + def want1(a=1, auto=Listener.AUTO_TOPIC): + pass + checkWantTopic(validate, noWant) + checkWantTopic(validate, want1, msgTopic) + + validator = ListenerValidator(['a'], ['b']) + validate = validator.validate + + def noWant2(a, b=1): + pass + def want2(a, auto=Listener.AUTO_TOPIC, b=1): + pass + checkWantTopic(validate, noWant2) + checkWantTopic(validate, want2, msgTopic) + + # topic that has Listener.AUTO_TOPIC as an arg rather than kwarg + validator = ListenerValidator([msgTopic], ['b']) + validate = validator.validate + def noWant3(auto, b=1): + pass + checkWantTopic(validate, noWant3) + + + def test5_DOAListeners(self): + # Test "dead on arrival" + + # test DOA of unbound method + def getListener1(): + class DOA: + def tmpFn(self): + pass + Listener( DOA.tmpFn, ArgsInfoMock() ) + self.assertRaises(ValueError, getListener1) + + # test DOA of tmp callable: + def getListener2(): + def fn(): + pass + class Wrapper: + def __init__(self, func): + self.func = func + def __call__(self): + pass + def onDead(listenerObj): + pass + + # check dead-on-arrival when no death callback specified: + doa1 = Listener( Wrapper(fn), ArgsInfoMock() ) + assert doa1.getCallable() is None + assert doa1.isDead() + self.assertRaises(RuntimeError, doa1, None, {}) + + # check dead-on-arrival when a death callback specified: + doa2 = Listener( Wrapper(fn), ArgsInfoMock(), onDead ) + assert doa2.getCallable() is None + assert doa2.isDead() + self.assertRaises(RuntimeError, doa2, None, {}) + + getListener2() + + + def test6_ListenerEq(self): + # Test equality tests of two listeners + + def listener1(): + pass + def listener2(): + pass + l1 = Listener(listener1, ArgsInfoMock()) + l2 = Listener(listener2, ArgsInfoMock()) + # verify that Listener can be compared for equality to another Listener, weakref, or callable + self.assertEqual (l1, l1) + self.assertNotEqual (l1, l2) + self.assertEqual (l1, listener1) + self.assertNotEqual (l1, listener2) + ll = [l1] + assert listener1 in ll + assert listener2 not in ll + self.assertEqual(ll.index(listener1), 0) + + # now for class method listener: + class MyListener: + def __call__(self): + pass + def meth(self): + pass + listener3 = MyListener() + l3 = Listener(listener3, ArgsInfoMock() ) + self.assertNotEqual (l3, l1) + self.assertNotEqual (l3, l2) + self.assertNotEqual (l3, listener2) + self.assertEqual (l3, l3) + self.assertEqual (l3, listener3) + self.assertNotEqual (l3, listener3.__call__) + + l4 = Listener(listener3.meth, ArgsInfoMock()) + self.assertEqual (l4, l4) + self.assertNotEqual (l4, l3) + self.assertNotEqual (l4, l2) + self.assertNotEqual (l4, listener3.__call__) + self.assertEqual (l4, listener3.meth) + + + def test7_DyingListenersClass(self): + # Test notification callbacks when listener dies + + # test dead listener notification + def onDead(weakListener): + lsrs.remove(weakListener) + + def listener1(): + pass + def listener2(): + pass + def listener3(): + pass + lsrs = [] + lsrs.append(Listener(listener1, ArgsInfoMock(False), onDead=onDead)) + lsrs.append(Listener(listener2, ArgsInfoMock(False), onDead=onDead)) + lsrs.append(Listener(listener3, ArgsInfoMock(False), onDead=onDead)) + + # now force some listeners to die, verify lsrs list + self.assertEqual(len(lsrs), 3) + del listener1 + self.assertEqual(len(lsrs), 2) + self.assertEqual(lsrs[0], listener2) + self.assertEqual(lsrs[1], listener3) + del listener2 + self.assertEqual(len(lsrs), 1) + self.assertEqual(lsrs[0], listener3) + del listener3 + self.assertEqual(len(lsrs), 0) + + + def test8_getArgsBadListener(self): + self.assertRaises(ListenerInadequate, getArgs, 1) + try: + getArgs(1) + except ListenerInadequate, exc: + msg = 'Listener "int" (from module "__main__") inadequate: type "int" not supported' + self.assertEqual(str(exc), msg) + + + def test10_weakMethod(self): + class Foo: + def meth(self): + pass + foo = Foo() + wm = WeakMethod(foo.meth) + str(wm) + + + def test11_testNaming(self): + aiMock = ArgsInfoMock() + + # define various type of listeners + def fn(): + pass + class Foo: + def __call__(self): + pass + def meth(self): + pass + + ll = Listener(fn, aiMock) + self.assertEqual(ll.typeName(), "fn") + self.assertEqual(ll.module(), "test_lib_pubsub_listener") + assert not ll.wantsTopicObjOnCall() + + foo = Foo() + ll = Listener(foo, aiMock) + self.assertEqual(ll.typeName(), "Foo") + self.assertEqual(ll.module(), "test_lib_pubsub_listener") + assert not ll.wantsTopicObjOnCall() + + ll = Listener(foo.meth, ArgsInfoMock('argName')) + self.assertEqual(ll.typeName(), "Foo.meth") + self.assertEqual(ll.module(), "test_lib_pubsub_listener") + assert ll.wantsTopicObjOnCall() + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() diff --git a/unittests/test_lib_pubsub_notify.py b/unittests/test_lib_pubsub_notify.py new file mode 100644 index 00000000..85a4706b --- /dev/null +++ b/unittests/test_lib_pubsub_notify.py @@ -0,0 +1,161 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from difflib import ndiff, unified_diff, context_diff + +# setup notification and logging +from wx.lib.pubsub import pub +from wx.lib.pubsub.utils.notification import useNotifyByWriteFile, INotificationHandler + + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Notify(wtc.WidgetTestCase): + + + def captureStdout(self): + from StringIO import StringIO + capture = StringIO() + useNotifyByWriteFile( fileObj = capture ) + return capture + + + def testNotifyByPrint(self): + capture = self.captureStdout() + + def listener1(arg1): + pass + pub.subscribe(listener1, 'baz') + pub.sendMessage('baz', arg1=123) + pub.unsubscribe(listener1, 'baz') + + def doa(): + def listener2(): + pass + pub.subscribe(listener2, 'bar') + doa() + + pub.delTopic('baz') + + expect = '''\ +PUBSUB: New topic "baz" created +PUBSUB: Subscribed listener "listener1" to topic "baz" +PUBSUB: Start sending message of topic "baz" +PUBSUB: Sending message of topic "baz" to listener listener1 +PUBSUB: Done sending message of topic "baz" +PUBSUB: Unsubscribed listener "listener1" from topic "baz" +PUBSUB: New topic "bar" created +PUBSUB: Subscribed listener "listener2" to topic "bar" +PUBSUB: Listener "listener2" of Topic "bar" has died +PUBSUB: Topic "baz" destroyed + ''' + captured = capture.getvalue() + # strip as other wise one has \n, at least on windows + assert captured.strip() == expect.strip(), \ + '\n'.join( unified_diff(expect.splitlines(), captured.splitlines(), n=0) ) + + + def testFlagChanges(self): + savedFlags = pub.getNotificationFlags() + + pub.setNotificationFlags(all=True, sendMessage=False, deadListener=False) + flags = pub.getNotificationFlags() + assert not flags['sendMessage'] + assert not flags['deadListener'] + assert flags['newTopic'] + assert flags['delTopic'] + assert flags['subscribe'] + assert flags['unsubscribe'] + + pub.setNotificationFlags(subscribe=False, deadListener=True) + flags = pub.getNotificationFlags() + assert not flags['sendMessage'] + assert not flags['subscribe'] + assert flags['newTopic'] + assert flags['delTopic'] + assert flags['deadListener'] + assert flags['unsubscribe'] + + pub.setNotificationFlags(all=False, subscribe=True, unsubscribe=True) + flags = pub.getNotificationFlags() + assert not flags['sendMessage'] + assert not flags['deadListener'] + assert not flags['newTopic'] + assert not flags['delTopic'] + assert flags['subscribe'] + assert flags['unsubscribe'] + + pub.setNotificationFlags(** savedFlags) + + + def testNotifications(self): + class Handler(INotificationHandler): + def __init__(self): + self.resetCounts() + def resetCounts(self): + self.counts = dict(send=0, sub=0, unsub=0, delt=0, newt=0, dead=0, all=0) + def notifySubscribe(self, pubListener, topicObj, newSub): + self.counts['sub'] += 1 + def notifyUnsubscribe(self, pubListener, topicObj): + self.counts['unsub'] += 1 + def notifyDeadListener(self, pubListener, topicObj): + self.counts['dead'] += 1 + def notifySend(self, stage, topicObj, pubListener=None): + if stage == 'pre': self.counts['send'] += 1 + def notifyNewTopic(self, topicObj, description, required, argsDocs): + self.counts['newt'] += 1 + def notifyDelTopic(self, topicName): + self.counts['delt'] += 1 + + notifiee = Handler() + pub.addNotificationHandler(notifiee) + pub.setNotificationFlags(all=True) + + def verify(**ref): + for key, val in notifiee.counts.iteritems(): + if key in ref: + self.assertEqual(val, ref[key], "\n%s\n%s" % (notifiee.counts, ref) ) + else: + self.assertEqual(val, 0, "%s = %s, expected 0" % (key, val)) + notifiee.resetCounts() + + verify() + def testListener(): + pass + def testListener2(): + pass + + pub.getOrCreateTopic('newTopic') + verify(newt=1) + + pub.subscribe(testListener, 'newTopic') + pub.subscribe(testListener2, 'newTopic') + verify(sub=2) + + pub.sendMessage('newTopic') + verify(send=1) + + del testListener + verify(dead=1) + + pub.unsubscribe(testListener2,'newTopic') + verify(unsub=1) + + pub.delTopic('newTopic') + verify(delt=1) + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() diff --git a/unittests/test_lib_pubsub_notify2.py b/unittests/test_lib_pubsub_notify2.py new file mode 100644 index 00000000..f85c5f33 --- /dev/null +++ b/unittests/test_lib_pubsub_notify2.py @@ -0,0 +1,116 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest + +import wtc + +from wx.lib.pubsub import pub +from wx.lib.pubsub.utils.notification import useNotifyByPubsubMessage +topicMgr = pub.getDefaultTopicMgr() + + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Notify2(wtc.WidgetTestCase): + + def test0_NotificationTopics(self): + assert not topicMgr.getTopic('pubsub', okIfNone=True) + useNotifyByPubsubMessage() + assert topicMgr.getTopic('pubsub') + + assert topicMgr.getTopic('pubsub').hasSubtopic() + + pubsubTopicNames = [obj.getName() for obj in topicMgr.getTopic('pubsub').getSubtopics()] + self.assertEqual( + set( pubsubTopicNames ), + set(['pubsub.sendMessage', 'pubsub.deadListener', + 'pubsub.subscribe', 'pubsub.unsubscribe', + 'pubsub.newTopic', 'pubsub.delTopic']) + ) + + + def test1_SubscribeNotify(self): + class MyListener: + countSub = 0 + countUnsub = 0 + def listenerSub(self, msgTopic=pub.AUTO_TOPIC, listener=None, + topic=None, newSub=None): + self.assertEqual(msgTopic.getName(), 'pubsub.subscribe' ) + assert topic.getName() in ('pubsub.unsubscribe', 'testSubscribeNotify') + if newSub: + self.countSub += 1 + def listenerUnsub(self, msgTopic=pub.AUTO_TOPIC, topic=None, + listener=None, listenerRaw=None): + assert topic.getName() in ('testSubscribeNotify', 'pubsub.subscribe' ) + self.assertEqual(msgTopic.getName(), 'pubsub.unsubscribe' ) + if listener is not None: + self.countUnsub += 1 + def listenerTest(self): + raise NotImplementedError # should never get here + + pub.setNotificationFlags(subscribe=True, unsubscribe=True) + pub.getOrCreateTopic('testSubscribeNotify') + tmp = MyListener() + tmp.assertEqual = self.assertEqual + + pub.subscribe(tmp.listenerSub, 'pubsub.subscribe') + self.assertEqual(tmp.countSub, 0) # don't notify of self subscription + self.assertEqual(tmp.countUnsub, 0) + sl, ok = pub.subscribe(tmp.listenerUnsub, 'pubsub.unsubscribe') + assert ok + self.assertEqual(tmp.countSub, 1) + self.assertEqual(tmp.countUnsub, 0) + + pub.subscribe(tmp.listenerTest, 'testSubscribeNotify') + self.assertEqual(tmp.countUnsub, 0) + pub.unsubscribe(tmp.listenerTest, 'testSubscribeNotify') + self.assertEqual(tmp.countUnsub, 1) + + pub.unsubscribe(tmp.listenerSub, 'pubsub.subscribe') + self.assertEqual(tmp.countSub, 2) + self.assertEqual(tmp.countUnsub, 2) + pub.unsubscribe(tmp.listenerUnsub, 'pubsub.unsubscribe') + self.assertEqual(tmp.countSub, 2) + self.assertEqual(tmp.countUnsub, 2) # don't notify of self unsubscription + + + def test2_SendNotify(self): + # trap the pubsub.sendMessage topic: + class SendHandler: + def __init__(self): + self.pre = self.post = 0 + def __call__(self, topic=None, stage=None, listener=None, msgTopic=pub.AUTO_TOPIC): + if stage == 'pre': + self.pre += 1 + else: + self.post += 1 + self.assertEqual(msgTopic.getName(), 'pubsub.sendMessage') + self.assertEqual(topic.getName(), 'testSendNotify') + sh = SendHandler() + sh.assertEqual = self.assertEqual + + pub.subscribe(sh, 'pubsub.sendMessage') + pub.setNotificationFlags(sendMessage=True) + + # generate a message that will cause pubsub.sendMessage to be generated too + assert sh.pre == 0 + assert sh.post == 0 + pub.getOrCreateTopic('testSendNotify') + pub.sendMessage('testSendNotify') + assert sh.pre == 1 + assert sh.post == 1 + + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() diff --git a/unittests/test_lib_pubsub_provider.py b/unittests/test_lib_pubsub_provider.py new file mode 100644 index 00000000..2049aa0c --- /dev/null +++ b/unittests/test_lib_pubsub_provider.py @@ -0,0 +1,260 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from textwrap import dedent + +from wx.lib.pubsub import pub + + +class my_topics: + class rootTopic1: + '''Root topic 1''' + + class subtopic_1: + ''' + Sub topic 1 of root topic. Docs rely on one blank line for + topic doc, and indentation for each argument doc. + ''' + + def msgDataSpec(arg1, arg2=None): + ''' + - arg1: some multiline doc + for arg1 + - arg2: some multiline doc + for arg2 + ''' + pass + + class subsubtopic_12: + '''Sub sub topic 2 of sub topic 1.''' + + def msgDataSpec(arg1, argA, arg2=None, argB=None): + ''' + - argA: doc for argA + - argB: doc for argB + ''' + pass + + class rootTopic2: + '''Root topic 2''' + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Except(wtc.WidgetTestCase): + + def test1(self): + + pub.importTopicTree(my_topics) + + + provString = """ + class rootTopic1: + class subtopic_1: + class subsubtopic_11: + ''' + Sub sub topic 1 of sub topic 1. Only need to doc the + extra args. + ''' + def msgDataSpec(arg1, arg3, arg2=None, arg4=None): + ''' + - arg3: doc for arg3 + - arg4: doc for arg4 + ''' + pass + + """ + + pub.importTopicTree(provString, format=pub.TOPIC_TREE_FROM_STRING) + + + provFile = """ + class rootTopic1: + class subtopic_2: + class subsubtopic_21: + '''Sub sub topic 1 of sub topic 2.''' + def msgDataSpec(arg1, arg2=None, someArg=456, arg4=None): + ''' + - arg1: doc for arg1 + - arg2: doc for arg2 + - arg4: doc for arg4 + ''' + pass + """ + + myTopicTree = file('myTopicTree.py', 'w') + myTopicTree.write(dedent(provFile)) + myTopicTree.close() + pub.importTopicTree('myTopicTree', format=pub.TOPIC_TREE_FROM_MODULE, lazy=True) + import os + os.remove('myTopicTree.py') + if os.path.exists('myTopicTree.pyc'): + os.remove('myTopicTree.pyc') + + assert not pub.getTopic('rootTopic1.subtopic_2', okIfNone=True) + # the following should create all topic tree since parent + # topics are automatically created + assert pub.getOrCreateTopic('rootTopic1.subtopic_1.subsubtopic_11') + assert pub.getOrCreateTopic('rootTopic1.subtopic_1.subsubtopic_12') + assert pub.getOrCreateTopic('rootTopic1.subtopic_2.subsubtopic_21') + + # validate that topic specs were properly parsed + def isValid(topicName, listener): + topic = pub.getTopic(topicName) + assert topic.getDescription() + assert topic.isSendable() + return topic.isValid(listener) + + def sub(): + pass + def sub_1(arg1, arg2=123): + pass + def sub_11(arg1, arg3, arg2=None, arg4=None): + pass + assert isValid('rootTopic1', sub) + assert isValid('rootTopic1.subtopic_1', sub_1) + assert isValid('rootTopic1.subtopic_1.subsubtopic_11', sub_11) + # no providers have spec for subtopic_2 + assert not pub.getTopic('rootTopic1.subtopic_2').isSendable() + + #printTreeSpec() + + pub.exportTopicTree('newTopicTree') + root2Defn = pub.exportTopicTree(rootTopicName='rootTopic1') + + import os + os.remove('newTopicTree.py') + if os.path.exists('newTopicTree.pyc'): + os.remove('newTopicTree.pyc') + + + def test2_import_export_no_change(self): + # + # Test that import/export/import does not change the import + # + + importStr = """ + '''Tree docs, can be anything you want.''' + + class test_import_export_no_change: + '''Root topic 1.''' + + class subtopic_1: + ''' + Sub topic 1 of root topic. Docs rely on one + blank line for topic doc, and indentation for + each argument doc. + ''' + + def msgDataSpec(arg1, arg2=None): + ''' + - arg1: some multiline doc + for arg1 + - arg2: some multiline doc + for arg2 + ''' + pass + """ + pub.clearTopicDefnProviders() + treeDoc = pub.importTopicTree(importStr, lazy = True, + format = pub.TOPIC_TREE_FROM_STRING) + assert treeDoc == '''Tree docs, can be anything you want.''' + root = pub.getOrCreateTopic('test_import_export_no_change.subtopic_1') + # few sanity checks + def sub_1(arg1, arg2=None): + pass + assert root.isSendable() + assert pub.isValid(sub_1, 'test_import_export_no_change.subtopic_1') + + # export tree + exported = pub.exportTopicTree(rootTopicName='test_import_export_no_change', moduleDoc=treeDoc) + #print exported + + expectExport = """\ + # Automatically generated by TopicTreeAsSpec(**kwargs). + # The kwargs were: + # - fileObj: StringIO + # - width: 70 + # - treeDoc: 'Tree docs, can be anything you want....' + # - indentStep: 4 + # - footer: '# End of topic tree definition. Note that application may l...' + + + ''' + Tree docs, can be anything you want. + ''' + + + class test_import_export_no_change: + ''' + Root topic 1. + ''' + + class subtopic_1: + ''' + Sub topic 1 of root topic. Docs rely on one + blank line for topic doc, and indentation for + each argument doc. + ''' + + def msgDataSpec(arg1, arg2=None): + ''' + - arg1: some multiline doc + for arg1 + - arg2: some multiline doc + for arg2 + ''' + + + # End of topic tree definition. Note that application may load + # more than one definitions provider. + """ + + # check there are no differences + from difflib import context_diff, ndiff + diffs = ndiff( dedent(expectExport).splitlines(), exported.splitlines()) + diffs = [d for d in diffs if not d.startswith(' ')] + #print '\n'.join(diffs) + assert diffs == ['- ', '+ '] + + # now for module: + modDoc = pub.importTopicTree('lib_pubsub_provider_expect', + format=pub.TOPIC_TREE_FROM_MODULE, + lazy=False) + assert modDoc.startswith('\nTree docs, can be anything you') + pub.exportTopicTree('lib_pubsub_provider_actual', + rootTopicName='test_import_export_no_change2', + moduleDoc=treeDoc) + lines1 = file('lib_pubsub_provider_actual.py', 'r').readlines() + lines2 = file('lib_pubsub_provider_expect.py', 'r').readlines() + diffs = context_diff( lines1, lines2 ) + assert not list(diffs) + + def test_module_as_class(self): + assert pub.getTopic('root_topic1', True) is None + assert pub.getTopic('root_topic2.sub_topic21', True) is None + + import lib_pubsub_provider_my_import_topics + pub.importTopicTree(lib_pubsub_provider_my_import_topics) + + assert pub.getTopic('root_topic1') is not None + assert pub.getTopic('root_topic2.sub_topic21') is not None + + pub.sendMessage(lib_pubsub_provider_my_import_topics.root_topic1) + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() + + \ No newline at end of file diff --git a/unittests/test_lib_pubsub_spec.py b/unittests/test_lib_pubsub_spec.py new file mode 100644 index 00000000..a71766f6 --- /dev/null +++ b/unittests/test_lib_pubsub_spec.py @@ -0,0 +1,132 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + +""" + +import imp_unittest, unittest +import wtc + + +from wx.lib.pubsub.core.topicargspec import \ + ArgsInfo, \ + ArgSpecGiven, \ + SenderMissingReqdArgs, \ + SenderUnknownOptArgs + + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Specs(wtc.WidgetTestCase): + + + def test1_create(self): + # root + td1 = ArgSpecGiven(dict()) + ai1 = ArgsInfo(('t1',), td1, None) + assert ai1.isComplete() + assert ai1.numArgs() == 0 + assert ai1.getArgs() == () + assert ai1.getCompleteAI() is ai1 + + # sub, complete + td2 = ArgSpecGiven( + argsDocs = dict(arg1='doc for arg1', arg2='doc for arg2'), + reqdArgs = ('arg2',)) + ai2 = ArgsInfo(('t1','st1'), td2, ai1) + assert ai2.isComplete() + assert ai2.numArgs() == 2 + assert ai2.getArgs() == ('arg1', 'arg2') + assert ai2.getCompleteAI() is ai2 + + # sub, missing + td2.argsSpecType = ArgSpecGiven.SPEC_GIVEN_NONE + ai4 = ArgsInfo(('t1','st3'), td2, ai1) + assert not ai4.isComplete() + assert ai4.numArgs() == 0 + assert ai4.getArgs() == () + assert ai4.getCompleteAI() is ai1 + + # sub, of incomplete spec, given ALL args + td3 = ArgSpecGiven( + argsDocs = dict(arg1='doc for arg1', arg2='doc for arg2'), + reqdArgs = ('arg2',)) + ai5 = ArgsInfo(('t1','st3','sst1'), td3, ai4) + assert ai5.isComplete() + assert ai5.numArgs() == 2 + assert ai5.hasSameArgs('arg1', 'arg2') + assert ai5.getCompleteAI() is ai5 + + def test2_update(self): + td1 = ArgSpecGiven(dict()) + td2 = ArgSpecGiven() + td4 = ArgSpecGiven() + td5 = ArgSpecGiven( + argsDocs = dict( + arg1='doc for arg1', arg2='doc for arg2', + arg3='doc for arg3', arg4='doc for arg4'), + reqdArgs = ('arg4','arg2')) + + ai1 = ArgsInfo(('t1',), td1, None) # root, complete + ai2 = ArgsInfo(('t1','st1'), td2, ai1) # sub 1, empty + ai4 = ArgsInfo(('t1','st1','sst2'), td4, ai2) # empty sub of sub 1 + ai5 = ArgsInfo(('t1','st1','sst3'), td5, ai2) # completed sub of sub 1 + + # check assumptions before we start: + assert not ai2.isComplete() + assert not ai4.isComplete() + assert ai5.isComplete() + assert ai2.numArgs() == 0 + assert ai4.numArgs() == 0 + assert ai5.numArgs() == 4 + + # pretend we have an update for ai2: all args now available + ai2.updateAllArgsFinal( ArgSpecGiven( + dict(arg1='doc for arg1', arg2='doc for arg2'), + ('arg2',)) ) + assert ai2.isComplete() + assert ai2.numArgs() == 2 + assert ai2.hasSameArgs('arg1', 'arg2') + assert ai2.getCompleteAI() is ai2 + + assert not ai4.isComplete() + + assert ai2.numArgs() == 2 + assert ai4.numArgs() == 0 + assert ai5.numArgs() == 4 + + assert ai4.getCompleteAI() is ai2 + + assert ai2.hasSameArgs('arg1', 'arg2') + assert ai5.hasSameArgs('arg1', 'arg2', 'arg3', 'arg4') + + def test3_filter(self): + td = ArgSpecGiven( + argsDocs = dict(arg1='doc for arg1', arg2='doc for arg2'), + reqdArgs = ('arg2',)) + ai = ArgsInfo(('t1',), td, None) + + # check: + argsMissingReqd = {} + self.assertRaises(SenderMissingReqdArgs, ai.check, argsMissingReqd) + + argsExtraOpt = dict(arg2=2, arg5=5) + self.assertRaises(SenderUnknownOptArgs, ai.check, argsExtraOpt) + + args = dict(arg1=1, arg2=2) + ai.check(args) + + # filter: + msgArgs = dict(arg1=1, arg2=2) + argsOK = msgArgs.copy() + assert ai.filterArgs( msgArgs ) == argsOK + msgArgs.update(arg3=3, arg4=4) + assert ai.filterArgs( msgArgs ) == argsOK + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() diff --git a/unittests/test_lib_pubsub_topic.py b/unittests/test_lib_pubsub_topic.py new file mode 100644 index 00000000..ab94d4a4 --- /dev/null +++ b/unittests/test_lib_pubsub_topic.py @@ -0,0 +1,168 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from wx.lib.pubsub.core.topicobj import Topic +from wx.lib.pubsub.core.treeconfig import TreeConfig +from wx.lib.pubsub.core.topicutils import ALL_TOPICS +from wx.lib.pubsub.core.topicargspec import ArgsInfo, ArgSpecGiven +from wx.lib.pubsub.core.listener import ListenerInadequate +from wx.lib.pubsub.core.topicexc import ListenerSpecInvalid + + + +#--------------------------------------------------------------------------- + + +class lib_pubsub_Topic(wtc.WidgetTestCase): + + rootTopic = None + treeConfig = TreeConfig() + + def test0_CreateRoot(self): + # + # Test create and then modify state of a topic object + # + + nameTuple = ('root',) + description = 'root description' + msgArgsInfo = None + + # when parent is None, only nameTuple=ALL_TOPICS is allowed, thereby + # guaranteeing that only one tree root can be created + self.assertRaises(ValueError, Topic, self.treeConfig, nameTuple, description, msgArgsInfo) + + # create the ALL TOPICS topic; it has no message args + nameTuple = (ALL_TOPICS,) + argSpec = ArgSpecGiven(dict() ) + msgArgsInfo = ArgsInfo(nameTuple, argSpec, None) + obj = Topic(self.treeConfig, nameTuple, description, msgArgsInfo) + + # verify its state is as expected after creation: + assert obj.getListeners() == [] + assert obj.getNumListeners() == 0 + assert obj.hasListeners() == False + + def listener1(): + pass + def listener2(): + pass + def badListener1(arg1): + pass # extra required arg + def badListener2(arg1=None): + pass # extra is optional + assert obj.isValid(listener1) + assert not obj.isValid(badListener1) + assert not obj.isValid(badListener2) + + self.rootTopic = obj + + + def test1_SubUnsub(self): + # + # Test subscription and unsubscription of listeners + # + + def listener1(): + pass + def listener2(): + pass + # need to run this here again to get rootTopic setup for this test + self.test0_CreateRoot() + obj = self.rootTopic + + # now modify its state by subscribing listeners + obj.subscribe(listener1) + obj.subscribe(listener2) + + obj.hasListener(listener1) + obj.hasListener(listener2) + assert obj.hasListeners() == True + assert set(obj.getListeners()) == set([listener1, listener2]) + assert obj.getNumListeners() == 2 + + # try to subscribe an invalid listener + def badListener(arg1): + pass # extra required arg + self.assertRaises(ListenerInadequate, obj.subscribe, badListener) + + # try unsubscribe + obj.unsubscribe(listener1) + assert obj.hasListeners() == True + assert obj.getListeners() == [listener2] + assert obj.getNumListeners() == 1 + + # try unsubscribe all, with filtering + obj.subscribe(listener1) + def listener3(): pass + obj.subscribe(listener3) + assert obj.getNumListeners() == 3 + def ff(listener): + # use != since it is defined in terms of ==; also, put listener + # on RHS to verify works even when Listener used on RHS + return listener2 != listener + obj.unsubscribeAllListeners(filter=ff) + assert obj.getNumListeners() == 1 + assert obj.getListeners() == [listener2] + obj.subscribe(listener1) + obj.subscribe(listener3) + assert obj.getNumListeners() == 3 + obj.unsubscribeAllListeners() + assert obj.getNumListeners() == 0 + + + def test2_CreateChild(self): + # + # Test creation of a child topic, subscription of listeners + # + + # need to run this here again to get rootTopic setup for this test + self.test0_CreateRoot() + + nameTuple = ('childOfAll',) + description = 'child description' + argsDocs = dict(arg1='arg1 desc', arg2='arg2 desc') + reqdArgs = ('arg2',) + argSpec = ArgSpecGiven(argsDocs=argsDocs, reqdArgs = reqdArgs) + msgArgsInfo = ArgsInfo(nameTuple, argSpec, self.rootTopic._getListenerSpec()) + parent = Topic(self.treeConfig, nameTuple, description, msgArgsInfo, + parent=self.rootTopic) + assert parent.getParent() is self.rootTopic + + # now create a child of child with wrong arguments so we can test exceptions + nameTuple = ('childOfAll', 'grandChild') + description = 'grandchild description' + + def tryCreate(ad, r): + argSpec = ArgSpecGiven(argsDocs=ad, reqdArgs = r) + msgArgsInfo = ArgsInfo(nameTuple, argSpec, parent._getListenerSpec()) + obj = Topic(self.treeConfig, nameTuple, description, msgArgsInfo, + parent=parent) + + # test when all OK + argsDocs = dict(arg1='arg1 desc', arg2='arg2 desc') + reqdArgs = ('arg2',) + tryCreate(argsDocs, reqdArgs) + # test when requiredArg wrong + reqdArgs = ('arg3',) + self.assertRaises(ListenerSpecInvalid, tryCreate, argsDocs, reqdArgs) + reqdArgs = () + self.assertRaises(ListenerSpecInvalid, tryCreate, argsDocs, reqdArgs) + # test when missing opt arg + argsDocs = dict(arg1='arg1 desc', arg2='arg2 desc') + reqdArgs = ('arg2',) + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main() + diff --git a/unittests/test_lib_pubsub_topicmgr.py b/unittests/test_lib_pubsub_topicmgr.py new file mode 100644 index 00000000..1930f526 --- /dev/null +++ b/unittests/test_lib_pubsub_topicmgr.py @@ -0,0 +1,426 @@ +""" + +:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. +:license: BSD, see LICENSE.txt for details. + + +""" + +import imp_unittest, unittest +import wtc + +from wx.lib.pubsub import pub + +from wx.lib.pubsub.pub import \ + ALL_TOPICS, \ + ListenerSpecInvalid, \ + ITopicDefnProvider, \ + TopicTreeTraverser, \ + UndefinedTopic, \ + UndefinedSubtopic, \ + ListenerNotValidatable + +from wx.lib.pubsub.core.topicmgr import \ + ArgSpecGiven + +from wx.lib.pubsub.core.topicutils import \ + TopicNameInvalid, \ + validateName + +topicMgr = pub.getDefaultTopicMgr() + +from wx.lib.pubsub.utils.topictreeprinter import \ + printTreeDocs, ITopicTreeVisitor + + +class lib_pubsub_TopicMgr0_Basic(wtc.WidgetTestCase): + """ + Only tests TopicMgr methods. This must use some query methods on + topic objects to validate that TopicMgr did it's job properly. + """ + + def failTopicName(self, name): + self.assertRaises(TopicNameInvalid, validateName, name) + + def test10_GoodTopicNames(self): + # + # Test that valid topic names are accepted by pubsub' + # + + validateName('test.asdf') + validateName('test.a') + validateName('test.a.b') + + def test10_BadTopicNames(self): + # + # Test that invalid topic names are rejected by pubsub + # + + # parts of topic name are 'empty' + self.failTopicName( '' ) + self.failTopicName( ('',) ) + self.failTopicName( ('test','asdf','') ) + self.failTopicName( ('test','a', None) ) + + # parts of topic name have invalid char + self.failTopicName( ('test','a','b','_') ) + self.failTopicName( ('(aa',) ) + + self.failTopicName( (ALL_TOPICS,) ) + + +class lib_pubsub_TopicMgr1_GetOrCreate_NoDefnProv(wtc.WidgetTestCase): + """ + Only tests TopicMgr methods. This must use some query methods on + topic objects to validate that TopicMgr did it's job properly. + """ + + def test10_NoProtoListener(self): + # + # Test the getOrCreateTopic without proto listener + # + + def verifyNonSendable(topicObj, nameTuple, parent): + """Any non-sendable topic will satisfy these conditions:""" + self.assertEqual(0, topicMgr.isTopicSpecified(nameTuple)) + assert not topicObj.isSendable() + assert topicObj.getListeners() == [] + assert topicObj.getNameTuple() == nameTuple + assert topicObj.getNumListeners() == 0 + assert topicObj.getParent() is parent + assert topicObj.getNodeName() == topicObj.getNameTuple()[-1] + def foobar(): + pass + assert not topicObj.hasListener(foobar) + assert not topicObj.hasListeners() + assert not topicObj.hasSubtopic('asdfafs') + assert not topicObj.isAll() + self.assertRaises(ListenerNotValidatable, topicObj.isValid, foobar) + self.assertRaises(ListenerNotValidatable, topicObj.validate, foobar) + # check that getTopic and getOrCreateTopic won't create again: + assert topicMgr.getOrCreateTopic(nameTuple) is topicObj + assert topicMgr.getTopic(nameTuple) is topicObj + + # test with a root topic + rootName = 'GetOrCreate_NoProtoListener' + tName = rootName + # verify doesn't exist yet + assert topicMgr.getTopic(tName, True) is None + # ok create it, unsendable + rootTopic = topicMgr.getOrCreateTopic(tName) + verifyNonSendable(rootTopic, (rootName,), topicMgr.getRootTopic()) + DESC_NO_SPEC = 'UNDOCUMENTED: created without spec' + assert rootTopic.getDescription() == DESC_NO_SPEC + assert rootTopic.isRoot() + assert rootTopic.getSubtopics() == [] + assert not rootTopic.isAll() + assert not rootTopic.hasSubtopic() + + # test with a subtopic + tName1 = (rootName, 'stB') + tName2 = tName1 + ('sstC',) + assert topicMgr.getTopic(tName1, True) is None + assert topicMgr.getTopic(tName2, True) is None + subsubTopic = topicMgr.getOrCreateTopic(tName2) + # verify that parent was created implicitly + subTopic = topicMgr.getTopic(tName1) + verifyNonSendable(subTopic, tName1, rootTopic) + verifyNonSendable(subsubTopic, tName2, subTopic) + assert subsubTopic.getDescription() == DESC_NO_SPEC + DESC_PARENT_NO_SPEC = 'UNDOCUMENTED: created as parent without specification' + assert subTopic.getDescription() == DESC_PARENT_NO_SPEC + assert rootTopic.getSubtopics() == [subTopic] + assert rootTopic.hasSubtopic() + assert subTopic.getSubtopics() == [subsubTopic] + assert subTopic.hasSubtopic() + assert subsubTopic.getSubtopics() == [] + assert not subsubTopic.hasSubtopic() + + # check that getTopic raises expected exception when undefined topic: + tName = 'Undefined' + self.assertRaises(UndefinedTopic, topicMgr.getTopic, tName) + tName = rootName + '.Undefined' + self.assertRaises(UndefinedSubtopic, topicMgr.getTopic, tName) + + def test20_WithProtoListener(self): + # + # Test the getOrCreateTopic with proto listener + # + + rootName = 'GetOrCreate_WithProtoListener' + tName = rootName + # verify doesn't exist yet + assert topicMgr.getTopic(tName, True) is None + def protoListener(arg1, arg2=None): + pass + # ok create it, sendable + rootTopic = topicMgr.getOrCreateTopic(tName, protoListener) + # check that getTopic and getOrCreateTopic won't create again: + assert topicMgr.getOrCreateTopic(tName) is rootTopic + assert topicMgr.getTopic(tName) is rootTopic + assert rootTopic.isSendable() + assert topicMgr.isTopicSpecified(tName) + expectDesc = 'UNDOCUMENTED: created from protoListener "protoListener" in module test_lib_pubsub_topicmgr' + assert rootTopic.getDescription() == expectDesc + + # check that topic created can discern between good and bad listener + assert rootTopic.isValid(protoListener) + def badListener1(): + pass # missing required arg + def badListener2(arg2): + pass # opt arg is required + def badListener3(arg1, arg3): + pass # extra required arg + assert not rootTopic.isValid(badListener1) + assert not rootTopic.isValid(badListener2) + assert not rootTopic.isValid(badListener3) + + # verify that missing parent created is not sendable, child is + def protoListener2(arg1, arg2=None): + pass + tName = (tName, 'stA', 'sstB') + subsubTopic = topicMgr.getOrCreateTopic(tName, protoListener2) + subTopic = topicMgr.getTopic( tName[:-1] ) + assert not topicMgr.isTopicSpecified( tName[:-1] ) + assert topicMgr.isTopicSpecified( tName ) + assert subsubTopic.isValid(protoListener2) + + +class lib_pubsub_TopicMgr2_GetOrCreate_DefnProv(wtc.WidgetTestCase): + """ + Test TopicManager when one or more definition providers + can provide for some topic definitions. + """ + + def test10_DefnProvider(self): + # + # Test the addition and clearing of definition providers + # + + class DefnProvider: + pass + dp1 = DefnProvider() + dp2 = DefnProvider() + assert 1 == topicMgr.addDefnProvider(dp1) + assert 1 == topicMgr.addDefnProvider(dp1) + assert 2 == topicMgr.addDefnProvider(dp2) + assert 2 == topicMgr.addDefnProvider(dp2) + assert 2 == topicMgr.addDefnProvider(dp1) + topicMgr.clearDefnProviders() + assert 0 == topicMgr.getNumDefnProviders() + assert 1 == topicMgr.addDefnProvider(dp1) + topicMgr.clearDefnProviders() + + def test20_UseProvider(self): + # + # Test the use of definition providers for topics. We create + # two so we can check that more than one can work together. + # One provides good definitions, one provides some with errors. + # + + class DefnProvider(ITopicDefnProvider): + """ + Provide definitions for a root topic, subtopic, and + one subtopic whose parent is not defined here. It is easier + to use sub-only definitions. + """ + def __init__(self): + self.defns = { + ('a',) : (dict(arg1='arg1 desc', arg2='arg2 desc'), + ('arg1',) ), + ('a', 'b') : (dict(arg1='arg1 desc', arg2='arg2 desc', + arg3='arg3 desc', arg4='arg2 desc'), + ('arg1', 'arg3',) ), + # parent doesn't have defn + ('a', 'c', 'd') : ( + dict(arg1='arg1 desc', arg2='arg2 desc', + arg3='arg3 desc', arg4='arg4 desc', + arg5='arg5 desc', arg6='arg6 desc'), + ('arg1', 'arg3', 'arg5',)), + } + + def getDefn(self, topicNameTuple): + if topicNameTuple not in self.defns: + return None, None + defn = ArgSpecGiven() + defn.setAll( * self.defns[topicNameTuple] ) + desc = '%s desc' % '.'.join(topicNameTuple) + return desc, defn + + class DefnProviderErr(ITopicDefnProvider): + """ + Provide some definitions that have wrong arg spec. It is + easier to use the 'all-spec' for definitions, which provides + an opportunity for a different method of ArgSpecGiven. + """ + def __init__(self): + self.defns = { + ('a', 'err1') : (# missing arg2 + dict(arg1=''), + ('arg1',) ), + ('a', 'err2') : (# missing arg1 + dict(arg2=''), ), + ('a', 'err3') : (# arg1 is no longer required + dict(arg1='', arg2=''), ), + } + + def getDefn(self, topicNameTuple): + if topicNameTuple not in self.defns: + return None, None + defn = ArgSpecGiven() + defn.setAll( * self.defns[topicNameTuple] ) + desc = '%s desc' % '.'.join(topicNameTuple) + return desc, defn + + topicMgr.addDefnProvider( DefnProvider() ) + topicMgr.addDefnProvider( DefnProviderErr() ) + + # create some topics that will use defn provider + topic = topicMgr.getOrCreateTopic('a') + assert topic.getDescription() == 'a desc' + assert topic.isSendable() + topic = topicMgr.getOrCreateTopic('a.b') + assert topic.getDescription() == 'a.b desc' + assert topic.isSendable() + topic = topicMgr.getOrCreateTopic('a.c.d') + assert topic.getDescription() == 'a.c.d desc' + assert topic.isSendable() + assert not topicMgr.isTopicSpecified('a.c') + # check + parent = topicMgr.getTopic('a.c') + assert not parent.isSendable() + def protoListener(arg1, arg3, arg2=None, arg4=None): pass + parent = topicMgr.getOrCreateTopic('a.c', protoListener) + assert parent.isSendable() + assert topic.isSendable() + + # now the erroneous ones: + def testRaises(topicName, expectMsg): + self.assertRaises(ListenerSpecInvalid, topicMgr.getOrCreateTopic, + topicName) + try: + assert topicMgr.getOrCreateTopic(topicName) is None + except ListenerSpecInvalid, exc: + # ok, did raise but is it correct message? + try: + str(exc).index(expectMsg) + except ValueError: + msg = 'Wrong message, expected \n "%s", got \n "%s"' + raise RuntimeError(msg % (expectMsg, str(exc)) ) + + testRaises('a.err1', 'Params [arg1] missing inherited [arg2] for topic "a.err1"') + testRaises('a.err2', 'Params [arg2] missing inherited [arg1] for topic "a.err2"') + testRaises('a.err3', 'Params [] missing inherited [arg1] for topic "a.err3" required args') + + + def test30_DelTopic(self): + # + # Test topic deletion + # + + topicMgr.getOrCreateTopic('delTopic.b.c.d.e') + assert topicMgr.getTopic('delTopic.b.c.d.e') is not None + assert topicMgr.getTopic('delTopic.b.c.d').hasSubtopic('e') + + assert topicMgr.getTopic('delTopic.b').hasSubtopic('c') + topicMgr.delTopic('delTopic.b.c') + assert not topicMgr.getTopic('delTopic.b').hasSubtopic('c') + assert topicMgr.getTopic('delTopic.b.c.d.e', okIfNone=True) is None + assert topicMgr.getTopic('delTopic.b.c.d', okIfNone=True) is None + assert topicMgr.getTopic('delTopic.b.c', okIfNone=True) is None + + +class lib_pubsub_TopicMgr3_TreeTraverser(wtc.WidgetTestCase): + expectedOutput = '''\ +\-- Topic "a2" + \-- Topic "a" + \-- Topic "a" + \-- Topic "b" + \-- Topic "b" + \-- Topic "a" + \-- Topic "b"''' + + + def test1(self): + # + # Test printing of topic tree + # + + root = topicMgr.getOrCreateTopic('a2') + topicMgr.getOrCreateTopic('a2.a.a') + topicMgr.getOrCreateTopic('a2.a.b') + topicMgr.getOrCreateTopic('a2.b.a') + topicMgr.getOrCreateTopic('a2.b.b') + + from StringIO import StringIO + buffer = StringIO() + printTreeDocs(rootTopic=root, width=70, fileObj=buffer) + self.assertEqual( buffer.getvalue(), self.expectedOutput ) + + def test2(self): + # + # Test traversing with and without filtering, breadth and depth + # + class MyTraverser(ITopicTreeVisitor): + def __init__(self): + self.traverser = pub.TopicTreeTraverser(self) + self.calls = '' + self.topics = [] + + def traverse(self, rootTopic, **kwargs): + self.traverser.traverse(rootTopic, **kwargs) + + def __append(self, val): + self.calls = self.calls + str(val) + + def _startTraversal(self): + self.__append(1) + + def _accept(self, topicObj): + self.__append(2) + # only accept topics at root or second level tree, or if tailName() is 'A' + return len(topicObj.getNameTuple()) <= 2 or topicObj.getNodeName() == 'A' + + def _onTopic(self, topicObj): + self.__append(3) + self.topics.append(topicObj.getNodeName()) + + def _startChildren(self): + self.__append(4) + + def _endChildren(self): + self.__append(5) + + def _doneTraversal(self): + self.__append(6) + + root = topicMgr.getOrCreateTopic('traversal') + topicMgr.getOrCreateTopic('traversal.a.A') + topicMgr.getOrCreateTopic('traversal.a.B.foo') + topicMgr.getOrCreateTopic('traversal.b.C') + topicMgr.getOrCreateTopic('traversal.b.D.bar') + + def exe(expectCalls, expectTopics, **kwargs): + traverser = MyTraverser() + traverser.traverse(root, **kwargs) + self.assertEqual(traverser.topics, expectTopics) + self.assertEqual(traverser.calls, expectCalls) + + exe(expectCalls = '13434345343455534345343455556', + expectTopics = ['traversal', 'a', 'A', 'B', 'foo', 'b', 'C', 'D', 'bar'], + onlyFiltered = False) + exe(expectCalls = '13433543354335454354543545456', + expectTopics = ['traversal', 'a', 'b', 'A', 'B', 'C', 'D', 'foo', 'bar'], + how = TopicTreeTraverser.BREADTH, onlyFiltered = False) + exe(expectCalls = '123423423452523422556', + expectTopics = ['traversal','a','A','b']) + exe(expectCalls = '123423235423254225456', + expectTopics = ['traversal','a','b','A'], + how = TopicTreeTraverser.BREADTH) + + +#--------------------------------------------------------------------------- + + +if __name__ == '__main__': + unittest.main()