Add pubsub unittests, adapted by Werner

git-svn-id: https://svn.wxwidgets.org/svn/wx/wxPython/Phoenix/trunk@73830 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
This commit is contained in:
Robin Dunn
2013-04-20 21:53:50 +00:00
parent 11761fb1c9
commit d0e440a304
14 changed files with 2204 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
def getRaisingListener():
def raisingListener():
def nested():
raise RuntimeError2('test')
nested()
return raisingListener

View File

@@ -0,0 +1,36 @@
# Automatically generated by TopicTreeAsSpec(**kwargs).
# The kwargs were:
# - fileObj: file
# - width: 70
# - treeDoc: 'Tree docs, can be anything you want....'
# - indentStep: 4
# - footer: '# End of topic tree definition. Note that application may l...'
'''
Tree docs, can be anything you want.
'''
class test_import_export_no_change2:
'''
Root topic 1.
'''
class subtopic_1:
'''
Sub topic 1 of root topic. Docs rely on one blank line for
topic doc, and indentation for each argument doc.
'''
def msgDataSpec(arg1, arg2=None):
'''
- arg1: some multiline doc
for arg1
- arg2: some multiline doc
for arg2
'''
# End of topic tree definition. Note that application may load
# more than one definitions provider.

View File

@@ -0,0 +1,36 @@
# Automatically generated by TopicTreeAsSpec(**kwargs).
# The kwargs were:
# - fileObj: file
# - width: 70
# - treeDoc: 'Tree docs, can be anything you want....'
# - indentStep: 4
# - footer: '# End of topic tree definition. Note that application may l...'
'''
Tree docs, can be anything you want.
'''
class test_import_export_no_change2:
'''
Root topic 1.
'''
class subtopic_1:
'''
Sub topic 1 of root topic. Docs rely on one blank line for
topic doc, and indentation for each argument doc.
'''
def msgDataSpec(arg1, arg2=None):
'''
- arg1: some multiline doc
for arg1
- arg2: some multiline doc
for arg2
'''
# End of topic tree definition. Note that application may load
# more than one definitions provider.

View File

@@ -0,0 +1,14 @@
class root_topic1:
'docs for root_topic1'
def msgDataSpec():
pass
class sub_topic11:
'docs for sub_topic11'
class root_topic2:
'docs for root_topic2'
class sub_topic21:
'docs for sub_topic21'

View File

@@ -0,0 +1,313 @@
"""
Except for one test, this file tests with auto-creation of topics
disabled, as it is more rigorous for testing purposes.
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub import pub
from wx.lib.pubsub.utils.notification import IgnoreNotificationsMixin
#---------------------------------------------------------------------------
class lib_pubsub_Except(wtc.WidgetTestCase):
def testDOAListenerPubsub(self):
# Verify that a 'temporary' listener (one that will be garbage collected
# as soon as subscribe() returns because there are no strong references to
# it) gets immediately unregistered
def listener():
pass
class Wrapper:
def __init__(self, func):
self.func = func
def __call__(self):
pass
pub.subscribe( Wrapper(listener), 'testDOAListenerPubsub')
assert not pub.getTopic('testDOAListenerPubsub').hasListeners()
assert pub.isValid(listener, 'testDOAListenerPubsub')
def testDeadListener(self):
# create a listener for listeners that have died
class DeathListener(IgnoreNotificationsMixin):
listenerStr = ''
def notifyDeadListener(self, pubListener, topicObj):
self.assertEqual(topicObj.getName(), 'sadTopic')
DeathListener.listenerStr = pubListener.name()
dl = DeathListener()
dl.assertEqual = self.assertEqual
pub.addNotificationHandler(dl)
pub.setNotificationFlags(deadListener=True)
# define a topic, subscribe to it, and kill its listener:
class TempListener:
def __call__(self, **kwargs):
pass
def __del__(self):
pass #print 'being deleted'
tempListener = TempListener()
expectLisrStr, _ = pub.getListenerID(tempListener)
pub.subscribe(tempListener, 'sadTopic')
del tempListener
# verify:
assert DeathListener.listenerStr.startswith(expectLisrStr), \
'"%s" !~ "%s"' % (DeathListener.listenerStr, expectLisrStr)
pub.addNotificationHandler(None)
pub.clearNotificationHandlers()
def testSubscribe(self):
topicName = 'testSubscribe'
def proto(a, b, c=None):
pass
pub.getOrCreateTopic(topicName, proto)
def listener(a, b, c=None): pass
# verify that pub.isValid() works too
pub.validate(listener, topicName)
assert pub.isValid(listener, topicName)
self.assertEqual(pub.getTopic(topicName).getNumListeners(), 0)
self.assertEqual(pub.getAssociatedTopics(listener), [])
assert not pub.isSubscribed(listener, topicName)
assert pub.subscribe(listener, topicName)
assert pub.isSubscribed(listener, topicName)
def topicNames(listener):
return [t.getName() for t in pub.getAssociatedTopics(listener)]
self.assertEqual(topicNames(listener), [topicName])
# should do nothing if already subscribed:
assert not pub.subscribe(listener, topicName)[1]
self.assertEqual(pub.getTopic(topicName).getNumListeners(), 1)
# test pub.getAssociatedTopics()
pub.subscribe(listener, 'lt2', )
self.assertEqual(set(topicNames(listener)),
set([topicName,'lt2']))
pub.subscribe(listener, 'lt1.lst1')
self.assertEqual(set(topicNames(listener)),
set([topicName,'lt2','lt1.lst1']))
# test ALL_TOPICS
def listenToAll():
pass
pub.subscribe(listenToAll, pub.ALL_TOPICS)
self.assertEqual(topicNames(listenToAll), [pub.ALL_TOPICS])
def testMissingReqdArgs(self):
def proto(a, b, c=None):
pass
pub.getOrCreateTopic('missingReqdArgs', proto)
self.assertRaises(pub.SenderMissingReqdArgs, pub.sendMessage,
'missingReqdArgs', a=1)
def testSendTopicWithMessage(self):
class MyListener:
def __init__(self):
self.count = 0
self.heardTopic = False
self.listen2Topics = []
def listen0(self):
pass
def listen1(self, **kwarg):
self.count += 1
self.heardTopic = True
def listen2(self, msgTopic=pub.AUTO_TOPIC, **kwarg):
self.listen2Topics.append(msgTopic.getName())
my = MyListener()
pub.subscribe(my.listen0, 'testSendTopic')
pub.subscribe(my.listen1, 'testSendTopic')
pub.subscribe(my.listen2, 'testSendTopic')
pub.sendMessage('testSendTopic')
self.assertEqual(my.count, 1)
self.assertEqual(my.heardTopic, True)
pub.subscribe(my.listen0, 'testSendTopic.subtopic')
pub.subscribe(my.listen1, 'testSendTopic.subtopic')
pub.subscribe(my.listen2, 'testSendTopic.subtopic')
pub.sendMessage('testSendTopic.subtopic')
self.assertEqual(my.count, 3)
self.assertEqual([], [topic for topic in my.listen2Topics
if topic not in ('testSendTopic', 'testSendTopic.subtopic')] )
def testAcceptAllArgs(self):
def listen(arg1=None):
pass
def listenAllArgs(arg1=None, **kwargs):
pass
def listenAllArgs2(arg1=None, msgTopic=pub.AUTO_TOPIC, **kwargs):
pass
pub.subscribe(listen, 'testAcceptAllArgs')
pub.subscribe(listenAllArgs, 'testAcceptAllArgs')
pub.subscribe(listenAllArgs2, 'testAcceptAllArgs')
pub.subscribe(listenAllArgs2, 'testAcceptAllArgs.subtopic')
pub.subscribe(listenAllArgs, 'testAcceptAllArgs.subtopic')
def testUnsubAll(self):
def lisnr1():
pass
def lisnr2():
pass
class MyListener:
def __call__(self):
pass
def meth(self):
pass
def __hash__(self):
return 123
lisnr3 = MyListener()
lisnr4 = lisnr3.meth
def lisnrSub(listener=None, topic=None, newSub=None): pass
pub.subscribe(lisnrSub, 'pubsub.subscribe')
self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1)
def subAll():
pub.subscribe(lisnr1, 'testUnsubAll')
pub.subscribe(lisnr2, 'testUnsubAll')
pub.subscribe(lisnr3, 'testUnsubAll')
pub.subscribe(lisnr4, 'testUnsubAll')
self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 4)
def filter(lisnr):
passes = str(lisnr).endswith('meth')
return passes
# test unsub many non-pubsub topic listeners
subAll()
pub.unsubAll('testUnsubAll')
self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 0)
self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1)
# now same but with filter:
subAll()
unsubed = pub.unsubAll('testUnsubAll', listenerFilter=filter)
self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 3)
self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1)
# test unsub all listeners of all topics
subAll()
self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 4)
unsubed = pub.unsubAll(listenerFilter=filter)
self.assertEqual(unsubed, [lisnr4])
self.assertEqual(pub.getTopic('testUnsubAll').getNumListeners(), 3)
self.assertEqual(pub.getTopic('pubsub.subscribe').getNumListeners(), 1)
unsubed = set( pub.unsubAll() )
expect = set([lisnr1, lisnrSub, lisnr3, lisnr2])
# at least all the 'expected' ones were unsub'd; will be others if this
# test is run after other unit tests in same nosetests run
assert unsubed >= expect
def testSendForUndefinedTopic(self):
pub.sendMessage('testSendForUndefinedTopic')
assert pub.getTopic('testSendForUndefinedTopic')
self.assertEqual(pub.getTopic('testSendForUndefinedTopic').getArgs(),
(None, None))
# must also check for subtopics if parents have listeners since
# filtering of args is affected
def listener():
pass
pub.subscribe(listener, 'testSendForUndefinedTopic')
pub.sendMessage('testSendForUndefinedTopic.subtopic', msg='something')
def testTopicUnspecifiedError(self):
self.assertRaises(pub.ListenerSpecIncomplete, pub.setTopicUnspecifiedFatal)
pub.setTopicUnspecifiedFatal(checkExisting=False)
def fn():
pass
LSI = pub.ListenerSpecIncomplete
self.assertRaises(LSI, pub.sendMessage, 'testTopicUnspecifiedError')
self.assertRaises(LSI, pub.subscribe, fn, 'testTopicUnspecifiedError')
pub.setTopicUnspecifiedFatal(False)
pub.sendMessage('testTopicUnspecifiedError')
pub.subscribe(fn, 'testTopicUnspecifiedError')
def testArgSpecDerivation(self):
def ok_0():
pass
def ok_1(arg1):
pass
def err_11(arg1=None):
pass # required can't become optional!
def err_12(arg2):
pass # parent's arg1 missing
def ok_2(arg1=None):
pass
def ok_21(arg1):
pass # optional can become required
def err_22(arg2):
pass # parent's arg1 missing
# with getOrCreateTopic(topic, proto), the 'required args' set
# is garanteed to be a subset of 'all args'
pub.getOrCreateTopic('tasd', ok_0)
pub.getOrCreateTopic('tasd.t_1', ok_1)
self.assertRaises(pub.ListenerSpecInvalid, pub.getOrCreateTopic,
'tasd.t_1.t_11', err_11)
self.assertRaises(pub.ListenerSpecInvalid, pub.getOrCreateTopic,
'tasd.t_1.t_12', err_12)
pub.getOrCreateTopic('tasd.t_2', ok_2)
pub.getOrCreateTopic('tasd.t_2.t_21', ok_21)
self.assertRaises(pub.ListenerSpecInvalid, pub.getOrCreateTopic,
'tasd.t_2.t_22', err_22)
# with newTopic(), 'required args' specified separately so
# verify that errors caught
def check(subName, required=(), **args):
tName = 'tasd.'+subName
try:
pub.newTopic(tName, 'desc', required, **args)
msg = 'Should have raised pub.ListenerSpecInvalid for %s, %s, %s'
assert False, msg % (tName, required, args)
except pub.ListenerSpecInvalid, exc:
#import traceback
#traceback.print_exc()
print 'As expected: ', exc
pub.newTopic('tasd.t_1.t_13', 'desc', ('arg1',), arg1='docs for arg1') # ok_1
check('t_1.t_14', arg1='docs for arg1') # err_11
check('t_1.t_15', ('arg2',), arg2='docs for arg2') # err_12
pub.newTopic('tasd.t_2.t_23', 'desc', ('arg1',), arg1='docs for arg1') # ok_21
check('t_2.t_24', ('arg2',), arg2='docs for arg2') # err_22
# check when no inheritence involved
# reqd args wrong
check('t_1.t_16', ('arg1',), arg2='docs for arg2')
check('t_1.t_17', ('arg2',), arg1='docs for arg1')
check('t_3', ('arg1',), arg2='docs for arg2')
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,38 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub import pub
from wx.lib.pubsub.utils import notification
from StringIO import StringIO
#---------------------------------------------------------------------------
class lib_pubsub_DefaultLog(wtc.WidgetTestCase):
def testNotifications(self):
capture = StringIO()
logger = notification.useNotifyByWriteFile(capture)
def block():
def listener(): pass
pub.subscribe(listener, 'testNotifications')
block()
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,148 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub import pub
def throws():
raise RuntimeError('test')
#---------------------------------------------------------------------------
class lib_pubsub_Except(wtc.WidgetTestCase):
def testHandleExcept1a(self):
from wx.lib.pubsub.utils.exchandling import ExcPublisher
excPublisher = ExcPublisher( pub.getDefaultTopicMgr() )
pub.setListenerExcHandler(excPublisher)
# create a listener that raises an exception:
from lib_pubsub_except_raisinglistener import getRaisingListener
raisingListener = getRaisingListener()
pub.setNotificationFlags(all=False)
pub.subscribe(raisingListener, 'testHandleExcept1a')
# first test when a listener raises an exception and exception listener also raises!
class BadUncaughtExcListener:
def __call__(self, listenerStr=None, excTraceback=None):
raise RuntimeError('bad exception listener!')
handler = BadUncaughtExcListener()
pub.subscribe(handler, ExcPublisher.topicUncaughtExc)
self.assertRaises(pub.ExcHandlerError, pub.sendMessage,
'testHandleExcept1a')
def testHandleExcept1b(self):
# create a listener that raises an exception:
from lib_pubsub_except_raisinglistener import getRaisingListener
raisingListener = getRaisingListener()
pub.subscribe(raisingListener, 'testHandleExcept1b')
# subscribe a good exception listener and validate
# create the listener for uncaught exceptions in listeners:
class UncaughtExcListener:
def __call__(self, listenerStr=None, excTraceback=None):
# verify that information received; first the listenerStr
assert listenerStr.startswith('raisingListener')
# next the traceback:
tb = excTraceback.traceback
self.assertEqual(len(tb), 2)
def validateTB(tbItem, eFN, eLine, eFnN, assertE=self.assertEqual):
assert tbItem[0].endswith(eFN), '%s !~ %s' % (tbItem[0], eFN)
assertE(tbItem[1], eLine)
assertE(tbItem[2], eFnN)
validateTB(tb[0], 'lib_pubsub_except_raisinglistener.py', 5, 'raisingListener')
validateTB(tb[1], 'lib_pubsub_except_raisinglistener.py', 4, 'nested')
# next the formatted traceback:
self.assertEqual(len(excTraceback.getFormattedList() ), len(tb)+1)
# finally the string for formatted traceback:
msg = excTraceback.getFormattedString()
#print 'Msg "%s"' % msg
assert msg.startswith(' File')
assert msg.endswith("global name 'RuntimeError2' is not defined\n")
from wx.lib.pubsub.utils.exchandling import ExcPublisher
topic = pub.getTopic(ExcPublisher.topicUncaughtExc)
assert not topic.hasListeners()
handler = UncaughtExcListener()
handler.assertEqual = self.assertEqual
pub.subscribe(handler, ExcPublisher.topicUncaughtExc)
pub.sendMessage('testHandleExcept1b')
# verify that listener isn't stuck in a cyclic reference by sys.exc_info()
del raisingListener
assert not pub.getTopic('testHandleExcept1b').hasListeners()
def testHandleExcept2(self):
#Test sendMessage when one handler, then change handler and verify changed
testTopic = 'testTopics.testHandleExcept2'
pub.subscribe(throws, testTopic)
pub.setListenerExcHandler(None)
#pubsub.utils.notification.useNotifyByWriteFile()
#assert_equal( pub.getTopic(testTopic).getNumListeners(), 1 )
expect = None
def validate(className):
global expect
assert expect == className
expect = None
class MyExcHandler:
def __call__(self, listener, topicObj):
validate(self.__class__.__name__)
class MyExcHandler2:
def __call__(self, listener, topicObj):
validate(self.__class__.__name__)
def doHandling(HandlerClass):
global expect
expect = HandlerClass.__name__ #'MyExcHandler'
excHandler = HandlerClass()
pub.setListenerExcHandler(excHandler)
pub.sendMessage(testTopic)
assert expect is None
doHandling(MyExcHandler)
doHandling(MyExcHandler2)
# restore to no handling and verify:
pub.setListenerExcHandler(None)
self.assertRaises(RuntimeError, pub.sendMessage, testTopic)
def testNoExceptionHandling1(self):
pub.setListenerExcHandler(None)
def raises():
raise RuntimeError('test')
pub.getOrCreateTopic('testNoExceptionTrapping')
pub.subscribe(raises, 'testNoExceptionTrapping')
self.assertRaises(RuntimeError, pub.sendMessage, 'testNoExceptionTrapping')
def testNoExceptionHandling2(self):
testTopic = 'testTopics.testNoExceptionHandling'
pub.subscribe(throws, testTopic)
assert pub.getListenerExcHandler() is None
self.assertRaises(RuntimeError, pub.sendMessage, testTopic)
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,349 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub.core.weakmethod import WeakMethod
from wx.lib.pubsub.core import listener
from wx.lib.pubsub.core.listener import \
Listener, ListenerValidator, \
ListenerInadequate, \
CallArgsInfo, \
getArgs
#---------------------------------------------------------------------------
class ArgsInfoMock:
def __init__(self, autoTopicArgName=None):
self.autoTopicArgName = autoTopicArgName
self.acceptsAllKwargs = False
#---------------------------------------------------------------------------
class lib_pubsub_ArgsInfo(wtc.WidgetTestCase):
def test0_ArgsInfo(self):
def listener0(msgTopic = Listener.AUTO_TOPIC):
pass
CallArgsInfo(listener0, 0)
def listener1(arg1, msgTopic = Listener.AUTO_TOPIC):
pass
CallArgsInfo(listener1, 1)
def test2_Validation0(self):
# Test when ValidatorSameKwargsOnly used, ie when args in
# listener and topic must be exact match (unless *arg).
AA = Listener.AUTO_TOPIC
# test for topic that has no arg/kwargs in topic message spec (TMS)
def same():
pass
def varargs(*args, **kwargs):
pass
def autoArg(msgTopic=AA):
pass
def extraArg(a):
pass
def extraKwarg(a=1):
pass
# no arg/kwarg in topic message spec (TMS)
validator = ListenerValidator([], [])
validate = validator.validate
validate(same) # ok: same
validate(varargs) # ok: *args/**kwargs
validate(autoArg) # ok: extra but AUTO_TOPIC
self.assertRaises(ListenerInadequate, validate, extraArg) # E: extra arg
self.assertRaises(ListenerInadequate, validate, extraKwarg) # E: extra kwarg
def test2_Validation1(self):
# one arg/kwarg in topic
validator = ListenerValidator(['a'], ['b'])
validate = validator.validate
def same(a, b=1):
pass
def same2(a=2, b=1):
pass
def varkwargs(**kwargs):
pass
def varkwargs_a(a, **kwargs):
pass
def opt_reqd(b, **kwargs):
pass
def missing_arg(b=1):
pass
def missing_kwarg(a):
pass
def extra_kwarg1(a,b=1,c=2):
pass
def extra_kwarg2(c=1, *args, **kwargs):
pass
def extra_arg1(a,c,b=1):
pass
def extra_arg2(a,b,c=2):
pass
validate(same) # ok: same
validate(same2) # ok: same even if a now has default value
validate(varkwargs_a) # ok: has **kwargs
validate(varkwargs) # ok: has **kwargs
self.assertRaises(ListenerInadequate, validate, opt_reqd) # E: b now required
self.assertRaises(ListenerInadequate, validate, missing_arg) # E: missing arg
self.assertRaises(ListenerInadequate, validate, missing_kwarg) # E: missing kwarg
self.assertRaises(ListenerInadequate, validate, extra_kwarg1) # E: extra kwarg
self.assertRaises( ListenerInadequate, validate, extra_kwarg2) # E: extra kwarg
self.assertRaises( ListenerInadequate, validate, extra_arg1) # E: extra arg
self.assertRaises( ListenerInadequate, validate, extra_arg2) # E: extra arg
def test3_IsCallable(self):
# Test the proper trapping of non-callable and certain types of
# callable objects.
# validate different types of callables
validator = ListenerValidator([], [])
# not a function:
notAFunc = 1 # just pick something that is not a function
self.assertRaises(ListenerInadequate, validator.validate, notAFunc)
# a regular function:
def aFunc():
pass
validator.validate(aFunc)
# a functor and a method
class Foo(object):
def __call__(self):
pass
def meth(self):
pass
foo = Foo()
validator.validate(foo)
validator.validate(foo.meth)
def test4_WantTopic(self):
# Test the correct determination of whether want topic
# auto-passed during sendMessage() calls.
# first check proper breakdown of listener args:
def listener(a, b=1):
pass
argsInfo = CallArgsInfo(listener, 0)
self.assertEqual(None, argsInfo.autoTopicArgName )
msgTopic = 'auto'
class MyListener:
def method(self, a, b=1, auto=Listener.AUTO_TOPIC):
pass
listener = MyListener()
argsInfo = getArgs(listener.method)
self.assertEqual(msgTopic, argsInfo.autoTopicArgName )
self.assertEqual(['a','b'], argsInfo.allParams )
# now some white box testing of validator that makes use of args info:
def checkWantTopic(validate, listener, wantTopicAsArg=None):
argsInfo = getArgs(listener)
self.assertEqual(argsInfo.autoTopicArgName, wantTopicAsArg)
validate(listener)
validator = ListenerValidator([], ['a'])
validate = validator.validate
def noWant(a=1):
pass
def want1(a=1, auto=Listener.AUTO_TOPIC):
pass
checkWantTopic(validate, noWant)
checkWantTopic(validate, want1, msgTopic)
validator = ListenerValidator(['a'], ['b'])
validate = validator.validate
def noWant2(a, b=1):
pass
def want2(a, auto=Listener.AUTO_TOPIC, b=1):
pass
checkWantTopic(validate, noWant2)
checkWantTopic(validate, want2, msgTopic)
# topic that has Listener.AUTO_TOPIC as an arg rather than kwarg
validator = ListenerValidator([msgTopic], ['b'])
validate = validator.validate
def noWant3(auto, b=1):
pass
checkWantTopic(validate, noWant3)
def test5_DOAListeners(self):
# Test "dead on arrival"
# test DOA of unbound method
def getListener1():
class DOA:
def tmpFn(self):
pass
Listener( DOA.tmpFn, ArgsInfoMock() )
self.assertRaises(ValueError, getListener1)
# test DOA of tmp callable:
def getListener2():
def fn():
pass
class Wrapper:
def __init__(self, func):
self.func = func
def __call__(self):
pass
def onDead(listenerObj):
pass
# check dead-on-arrival when no death callback specified:
doa1 = Listener( Wrapper(fn), ArgsInfoMock() )
assert doa1.getCallable() is None
assert doa1.isDead()
self.assertRaises(RuntimeError, doa1, None, {})
# check dead-on-arrival when a death callback specified:
doa2 = Listener( Wrapper(fn), ArgsInfoMock(), onDead )
assert doa2.getCallable() is None
assert doa2.isDead()
self.assertRaises(RuntimeError, doa2, None, {})
getListener2()
def test6_ListenerEq(self):
# Test equality tests of two listeners
def listener1():
pass
def listener2():
pass
l1 = Listener(listener1, ArgsInfoMock())
l2 = Listener(listener2, ArgsInfoMock())
# verify that Listener can be compared for equality to another Listener, weakref, or callable
self.assertEqual (l1, l1)
self.assertNotEqual (l1, l2)
self.assertEqual (l1, listener1)
self.assertNotEqual (l1, listener2)
ll = [l1]
assert listener1 in ll
assert listener2 not in ll
self.assertEqual(ll.index(listener1), 0)
# now for class method listener:
class MyListener:
def __call__(self):
pass
def meth(self):
pass
listener3 = MyListener()
l3 = Listener(listener3, ArgsInfoMock() )
self.assertNotEqual (l3, l1)
self.assertNotEqual (l3, l2)
self.assertNotEqual (l3, listener2)
self.assertEqual (l3, l3)
self.assertEqual (l3, listener3)
self.assertNotEqual (l3, listener3.__call__)
l4 = Listener(listener3.meth, ArgsInfoMock())
self.assertEqual (l4, l4)
self.assertNotEqual (l4, l3)
self.assertNotEqual (l4, l2)
self.assertNotEqual (l4, listener3.__call__)
self.assertEqual (l4, listener3.meth)
def test7_DyingListenersClass(self):
# Test notification callbacks when listener dies
# test dead listener notification
def onDead(weakListener):
lsrs.remove(weakListener)
def listener1():
pass
def listener2():
pass
def listener3():
pass
lsrs = []
lsrs.append(Listener(listener1, ArgsInfoMock(False), onDead=onDead))
lsrs.append(Listener(listener2, ArgsInfoMock(False), onDead=onDead))
lsrs.append(Listener(listener3, ArgsInfoMock(False), onDead=onDead))
# now force some listeners to die, verify lsrs list
self.assertEqual(len(lsrs), 3)
del listener1
self.assertEqual(len(lsrs), 2)
self.assertEqual(lsrs[0], listener2)
self.assertEqual(lsrs[1], listener3)
del listener2
self.assertEqual(len(lsrs), 1)
self.assertEqual(lsrs[0], listener3)
del listener3
self.assertEqual(len(lsrs), 0)
def test8_getArgsBadListener(self):
self.assertRaises(ListenerInadequate, getArgs, 1)
try:
getArgs(1)
except ListenerInadequate, exc:
msg = 'Listener "int" (from module "__main__") inadequate: type "int" not supported'
self.assertEqual(str(exc), msg)
def test10_weakMethod(self):
class Foo:
def meth(self):
pass
foo = Foo()
wm = WeakMethod(foo.meth)
str(wm)
def test11_testNaming(self):
aiMock = ArgsInfoMock()
# define various type of listeners
def fn():
pass
class Foo:
def __call__(self):
pass
def meth(self):
pass
ll = Listener(fn, aiMock)
self.assertEqual(ll.typeName(), "fn")
self.assertEqual(ll.module(), "test_lib_pubsub_listener")
assert not ll.wantsTopicObjOnCall()
foo = Foo()
ll = Listener(foo, aiMock)
self.assertEqual(ll.typeName(), "Foo")
self.assertEqual(ll.module(), "test_lib_pubsub_listener")
assert not ll.wantsTopicObjOnCall()
ll = Listener(foo.meth, ArgsInfoMock('argName'))
self.assertEqual(ll.typeName(), "Foo.meth")
self.assertEqual(ll.module(), "test_lib_pubsub_listener")
assert ll.wantsTopicObjOnCall()
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,161 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from difflib import ndiff, unified_diff, context_diff
# setup notification and logging
from wx.lib.pubsub import pub
from wx.lib.pubsub.utils.notification import useNotifyByWriteFile, INotificationHandler
#---------------------------------------------------------------------------
class lib_pubsub_Notify(wtc.WidgetTestCase):
def captureStdout(self):
from StringIO import StringIO
capture = StringIO()
useNotifyByWriteFile( fileObj = capture )
return capture
def testNotifyByPrint(self):
capture = self.captureStdout()
def listener1(arg1):
pass
pub.subscribe(listener1, 'baz')
pub.sendMessage('baz', arg1=123)
pub.unsubscribe(listener1, 'baz')
def doa():
def listener2():
pass
pub.subscribe(listener2, 'bar')
doa()
pub.delTopic('baz')
expect = '''\
PUBSUB: New topic "baz" created
PUBSUB: Subscribed listener "listener1" to topic "baz"
PUBSUB: Start sending message of topic "baz"
PUBSUB: Sending message of topic "baz" to listener listener1
PUBSUB: Done sending message of topic "baz"
PUBSUB: Unsubscribed listener "listener1" from topic "baz"
PUBSUB: New topic "bar" created
PUBSUB: Subscribed listener "listener2" to topic "bar"
PUBSUB: Listener "listener2" of Topic "bar" has died
PUBSUB: Topic "baz" destroyed
'''
captured = capture.getvalue()
# strip as other wise one has \n, at least on windows
assert captured.strip() == expect.strip(), \
'\n'.join( unified_diff(expect.splitlines(), captured.splitlines(), n=0) )
def testFlagChanges(self):
savedFlags = pub.getNotificationFlags()
pub.setNotificationFlags(all=True, sendMessage=False, deadListener=False)
flags = pub.getNotificationFlags()
assert not flags['sendMessage']
assert not flags['deadListener']
assert flags['newTopic']
assert flags['delTopic']
assert flags['subscribe']
assert flags['unsubscribe']
pub.setNotificationFlags(subscribe=False, deadListener=True)
flags = pub.getNotificationFlags()
assert not flags['sendMessage']
assert not flags['subscribe']
assert flags['newTopic']
assert flags['delTopic']
assert flags['deadListener']
assert flags['unsubscribe']
pub.setNotificationFlags(all=False, subscribe=True, unsubscribe=True)
flags = pub.getNotificationFlags()
assert not flags['sendMessage']
assert not flags['deadListener']
assert not flags['newTopic']
assert not flags['delTopic']
assert flags['subscribe']
assert flags['unsubscribe']
pub.setNotificationFlags(** savedFlags)
def testNotifications(self):
class Handler(INotificationHandler):
def __init__(self):
self.resetCounts()
def resetCounts(self):
self.counts = dict(send=0, sub=0, unsub=0, delt=0, newt=0, dead=0, all=0)
def notifySubscribe(self, pubListener, topicObj, newSub):
self.counts['sub'] += 1
def notifyUnsubscribe(self, pubListener, topicObj):
self.counts['unsub'] += 1
def notifyDeadListener(self, pubListener, topicObj):
self.counts['dead'] += 1
def notifySend(self, stage, topicObj, pubListener=None):
if stage == 'pre': self.counts['send'] += 1
def notifyNewTopic(self, topicObj, description, required, argsDocs):
self.counts['newt'] += 1
def notifyDelTopic(self, topicName):
self.counts['delt'] += 1
notifiee = Handler()
pub.addNotificationHandler(notifiee)
pub.setNotificationFlags(all=True)
def verify(**ref):
for key, val in notifiee.counts.iteritems():
if key in ref:
self.assertEqual(val, ref[key], "\n%s\n%s" % (notifiee.counts, ref) )
else:
self.assertEqual(val, 0, "%s = %s, expected 0" % (key, val))
notifiee.resetCounts()
verify()
def testListener():
pass
def testListener2():
pass
pub.getOrCreateTopic('newTopic')
verify(newt=1)
pub.subscribe(testListener, 'newTopic')
pub.subscribe(testListener2, 'newTopic')
verify(sub=2)
pub.sendMessage('newTopic')
verify(send=1)
del testListener
verify(dead=1)
pub.unsubscribe(testListener2,'newTopic')
verify(unsub=1)
pub.delTopic('newTopic')
verify(delt=1)
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,116 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub import pub
from wx.lib.pubsub.utils.notification import useNotifyByPubsubMessage
topicMgr = pub.getDefaultTopicMgr()
#---------------------------------------------------------------------------
class lib_pubsub_Notify2(wtc.WidgetTestCase):
def test0_NotificationTopics(self):
assert not topicMgr.getTopic('pubsub', okIfNone=True)
useNotifyByPubsubMessage()
assert topicMgr.getTopic('pubsub')
assert topicMgr.getTopic('pubsub').hasSubtopic()
pubsubTopicNames = [obj.getName() for obj in topicMgr.getTopic('pubsub').getSubtopics()]
self.assertEqual(
set( pubsubTopicNames ),
set(['pubsub.sendMessage', 'pubsub.deadListener',
'pubsub.subscribe', 'pubsub.unsubscribe',
'pubsub.newTopic', 'pubsub.delTopic'])
)
def test1_SubscribeNotify(self):
class MyListener:
countSub = 0
countUnsub = 0
def listenerSub(self, msgTopic=pub.AUTO_TOPIC, listener=None,
topic=None, newSub=None):
self.assertEqual(msgTopic.getName(), 'pubsub.subscribe' )
assert topic.getName() in ('pubsub.unsubscribe', 'testSubscribeNotify')
if newSub:
self.countSub += 1
def listenerUnsub(self, msgTopic=pub.AUTO_TOPIC, topic=None,
listener=None, listenerRaw=None):
assert topic.getName() in ('testSubscribeNotify', 'pubsub.subscribe' )
self.assertEqual(msgTopic.getName(), 'pubsub.unsubscribe' )
if listener is not None:
self.countUnsub += 1
def listenerTest(self):
raise NotImplementedError # should never get here
pub.setNotificationFlags(subscribe=True, unsubscribe=True)
pub.getOrCreateTopic('testSubscribeNotify')
tmp = MyListener()
tmp.assertEqual = self.assertEqual
pub.subscribe(tmp.listenerSub, 'pubsub.subscribe')
self.assertEqual(tmp.countSub, 0) # don't notify of self subscription
self.assertEqual(tmp.countUnsub, 0)
sl, ok = pub.subscribe(tmp.listenerUnsub, 'pubsub.unsubscribe')
assert ok
self.assertEqual(tmp.countSub, 1)
self.assertEqual(tmp.countUnsub, 0)
pub.subscribe(tmp.listenerTest, 'testSubscribeNotify')
self.assertEqual(tmp.countUnsub, 0)
pub.unsubscribe(tmp.listenerTest, 'testSubscribeNotify')
self.assertEqual(tmp.countUnsub, 1)
pub.unsubscribe(tmp.listenerSub, 'pubsub.subscribe')
self.assertEqual(tmp.countSub, 2)
self.assertEqual(tmp.countUnsub, 2)
pub.unsubscribe(tmp.listenerUnsub, 'pubsub.unsubscribe')
self.assertEqual(tmp.countSub, 2)
self.assertEqual(tmp.countUnsub, 2) # don't notify of self unsubscription
def test2_SendNotify(self):
# trap the pubsub.sendMessage topic:
class SendHandler:
def __init__(self):
self.pre = self.post = 0
def __call__(self, topic=None, stage=None, listener=None, msgTopic=pub.AUTO_TOPIC):
if stage == 'pre':
self.pre += 1
else:
self.post += 1
self.assertEqual(msgTopic.getName(), 'pubsub.sendMessage')
self.assertEqual(topic.getName(), 'testSendNotify')
sh = SendHandler()
sh.assertEqual = self.assertEqual
pub.subscribe(sh, 'pubsub.sendMessage')
pub.setNotificationFlags(sendMessage=True)
# generate a message that will cause pubsub.sendMessage to be generated too
assert sh.pre == 0
assert sh.post == 0
pub.getOrCreateTopic('testSendNotify')
pub.sendMessage('testSendNotify')
assert sh.pre == 1
assert sh.post == 1
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,260 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from textwrap import dedent
from wx.lib.pubsub import pub
class my_topics:
class rootTopic1:
'''Root topic 1'''
class subtopic_1:
'''
Sub topic 1 of root topic. Docs rely on one blank line for
topic doc, and indentation for each argument doc.
'''
def msgDataSpec(arg1, arg2=None):
'''
- arg1: some multiline doc
for arg1
- arg2: some multiline doc
for arg2
'''
pass
class subsubtopic_12:
'''Sub sub topic 2 of sub topic 1.'''
def msgDataSpec(arg1, argA, arg2=None, argB=None):
'''
- argA: doc for argA
- argB: doc for argB
'''
pass
class rootTopic2:
'''Root topic 2'''
#---------------------------------------------------------------------------
class lib_pubsub_Except(wtc.WidgetTestCase):
def test1(self):
pub.importTopicTree(my_topics)
provString = """
class rootTopic1:
class subtopic_1:
class subsubtopic_11:
'''
Sub sub topic 1 of sub topic 1. Only need to doc the
extra args.
'''
def msgDataSpec(arg1, arg3, arg2=None, arg4=None):
'''
- arg3: doc for arg3
- arg4: doc for arg4
'''
pass
"""
pub.importTopicTree(provString, format=pub.TOPIC_TREE_FROM_STRING)
provFile = """
class rootTopic1:
class subtopic_2:
class subsubtopic_21:
'''Sub sub topic 1 of sub topic 2.'''
def msgDataSpec(arg1, arg2=None, someArg=456, arg4=None):
'''
- arg1: doc for arg1
- arg2: doc for arg2
- arg4: doc for arg4
'''
pass
"""
myTopicTree = file('myTopicTree.py', 'w')
myTopicTree.write(dedent(provFile))
myTopicTree.close()
pub.importTopicTree('myTopicTree', format=pub.TOPIC_TREE_FROM_MODULE, lazy=True)
import os
os.remove('myTopicTree.py')
if os.path.exists('myTopicTree.pyc'):
os.remove('myTopicTree.pyc')
assert not pub.getTopic('rootTopic1.subtopic_2', okIfNone=True)
# the following should create all topic tree since parent
# topics are automatically created
assert pub.getOrCreateTopic('rootTopic1.subtopic_1.subsubtopic_11')
assert pub.getOrCreateTopic('rootTopic1.subtopic_1.subsubtopic_12')
assert pub.getOrCreateTopic('rootTopic1.subtopic_2.subsubtopic_21')
# validate that topic specs were properly parsed
def isValid(topicName, listener):
topic = pub.getTopic(topicName)
assert topic.getDescription()
assert topic.isSendable()
return topic.isValid(listener)
def sub():
pass
def sub_1(arg1, arg2=123):
pass
def sub_11(arg1, arg3, arg2=None, arg4=None):
pass
assert isValid('rootTopic1', sub)
assert isValid('rootTopic1.subtopic_1', sub_1)
assert isValid('rootTopic1.subtopic_1.subsubtopic_11', sub_11)
# no providers have spec for subtopic_2
assert not pub.getTopic('rootTopic1.subtopic_2').isSendable()
#printTreeSpec()
pub.exportTopicTree('newTopicTree')
root2Defn = pub.exportTopicTree(rootTopicName='rootTopic1')
import os
os.remove('newTopicTree.py')
if os.path.exists('newTopicTree.pyc'):
os.remove('newTopicTree.pyc')
def test2_import_export_no_change(self):
#
# Test that import/export/import does not change the import
#
importStr = """
'''Tree docs, can be anything you want.'''
class test_import_export_no_change:
'''Root topic 1.'''
class subtopic_1:
'''
Sub topic 1 of root topic. Docs rely on one
blank line for topic doc, and indentation for
each argument doc.
'''
def msgDataSpec(arg1, arg2=None):
'''
- arg1: some multiline doc
for arg1
- arg2: some multiline doc
for arg2
'''
pass
"""
pub.clearTopicDefnProviders()
treeDoc = pub.importTopicTree(importStr, lazy = True,
format = pub.TOPIC_TREE_FROM_STRING)
assert treeDoc == '''Tree docs, can be anything you want.'''
root = pub.getOrCreateTopic('test_import_export_no_change.subtopic_1')
# few sanity checks
def sub_1(arg1, arg2=None):
pass
assert root.isSendable()
assert pub.isValid(sub_1, 'test_import_export_no_change.subtopic_1')
# export tree
exported = pub.exportTopicTree(rootTopicName='test_import_export_no_change', moduleDoc=treeDoc)
#print exported
expectExport = """\
# Automatically generated by TopicTreeAsSpec(**kwargs).
# The kwargs were:
# - fileObj: StringIO
# - width: 70
# - treeDoc: 'Tree docs, can be anything you want....'
# - indentStep: 4
# - footer: '# End of topic tree definition. Note that application may l...'
'''
Tree docs, can be anything you want.
'''
class test_import_export_no_change:
'''
Root topic 1.
'''
class subtopic_1:
'''
Sub topic 1 of root topic. Docs rely on one
blank line for topic doc, and indentation for
each argument doc.
'''
def msgDataSpec(arg1, arg2=None):
'''
- arg1: some multiline doc
for arg1
- arg2: some multiline doc
for arg2
'''
# End of topic tree definition. Note that application may load
# more than one definitions provider.
"""
# check there are no differences
from difflib import context_diff, ndiff
diffs = ndiff( dedent(expectExport).splitlines(), exported.splitlines())
diffs = [d for d in diffs if not d.startswith(' ')]
#print '\n'.join(diffs)
assert diffs == ['- ', '+ ']
# now for module:
modDoc = pub.importTopicTree('lib_pubsub_provider_expect',
format=pub.TOPIC_TREE_FROM_MODULE,
lazy=False)
assert modDoc.startswith('\nTree docs, can be anything you')
pub.exportTopicTree('lib_pubsub_provider_actual',
rootTopicName='test_import_export_no_change2',
moduleDoc=treeDoc)
lines1 = file('lib_pubsub_provider_actual.py', 'r').readlines()
lines2 = file('lib_pubsub_provider_expect.py', 'r').readlines()
diffs = context_diff( lines1, lines2 )
assert not list(diffs)
def test_module_as_class(self):
assert pub.getTopic('root_topic1', True) is None
assert pub.getTopic('root_topic2.sub_topic21', True) is None
import lib_pubsub_provider_my_import_topics
pub.importTopicTree(lib_pubsub_provider_my_import_topics)
assert pub.getTopic('root_topic1') is not None
assert pub.getTopic('root_topic2.sub_topic21') is not None
pub.sendMessage(lib_pubsub_provider_my_import_topics.root_topic1)
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,132 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub.core.topicargspec import \
ArgsInfo, \
ArgSpecGiven, \
SenderMissingReqdArgs, \
SenderUnknownOptArgs
#---------------------------------------------------------------------------
class lib_pubsub_Specs(wtc.WidgetTestCase):
def test1_create(self):
# root
td1 = ArgSpecGiven(dict())
ai1 = ArgsInfo(('t1',), td1, None)
assert ai1.isComplete()
assert ai1.numArgs() == 0
assert ai1.getArgs() == ()
assert ai1.getCompleteAI() is ai1
# sub, complete
td2 = ArgSpecGiven(
argsDocs = dict(arg1='doc for arg1', arg2='doc for arg2'),
reqdArgs = ('arg2',))
ai2 = ArgsInfo(('t1','st1'), td2, ai1)
assert ai2.isComplete()
assert ai2.numArgs() == 2
assert ai2.getArgs() == ('arg1', 'arg2')
assert ai2.getCompleteAI() is ai2
# sub, missing
td2.argsSpecType = ArgSpecGiven.SPEC_GIVEN_NONE
ai4 = ArgsInfo(('t1','st3'), td2, ai1)
assert not ai4.isComplete()
assert ai4.numArgs() == 0
assert ai4.getArgs() == ()
assert ai4.getCompleteAI() is ai1
# sub, of incomplete spec, given ALL args
td3 = ArgSpecGiven(
argsDocs = dict(arg1='doc for arg1', arg2='doc for arg2'),
reqdArgs = ('arg2',))
ai5 = ArgsInfo(('t1','st3','sst1'), td3, ai4)
assert ai5.isComplete()
assert ai5.numArgs() == 2
assert ai5.hasSameArgs('arg1', 'arg2')
assert ai5.getCompleteAI() is ai5
def test2_update(self):
td1 = ArgSpecGiven(dict())
td2 = ArgSpecGiven()
td4 = ArgSpecGiven()
td5 = ArgSpecGiven(
argsDocs = dict(
arg1='doc for arg1', arg2='doc for arg2',
arg3='doc for arg3', arg4='doc for arg4'),
reqdArgs = ('arg4','arg2'))
ai1 = ArgsInfo(('t1',), td1, None) # root, complete
ai2 = ArgsInfo(('t1','st1'), td2, ai1) # sub 1, empty
ai4 = ArgsInfo(('t1','st1','sst2'), td4, ai2) # empty sub of sub 1
ai5 = ArgsInfo(('t1','st1','sst3'), td5, ai2) # completed sub of sub 1
# check assumptions before we start:
assert not ai2.isComplete()
assert not ai4.isComplete()
assert ai5.isComplete()
assert ai2.numArgs() == 0
assert ai4.numArgs() == 0
assert ai5.numArgs() == 4
# pretend we have an update for ai2: all args now available
ai2.updateAllArgsFinal( ArgSpecGiven(
dict(arg1='doc for arg1', arg2='doc for arg2'),
('arg2',)) )
assert ai2.isComplete()
assert ai2.numArgs() == 2
assert ai2.hasSameArgs('arg1', 'arg2')
assert ai2.getCompleteAI() is ai2
assert not ai4.isComplete()
assert ai2.numArgs() == 2
assert ai4.numArgs() == 0
assert ai5.numArgs() == 4
assert ai4.getCompleteAI() is ai2
assert ai2.hasSameArgs('arg1', 'arg2')
assert ai5.hasSameArgs('arg1', 'arg2', 'arg3', 'arg4')
def test3_filter(self):
td = ArgSpecGiven(
argsDocs = dict(arg1='doc for arg1', arg2='doc for arg2'),
reqdArgs = ('arg2',))
ai = ArgsInfo(('t1',), td, None)
# check:
argsMissingReqd = {}
self.assertRaises(SenderMissingReqdArgs, ai.check, argsMissingReqd)
argsExtraOpt = dict(arg2=2, arg5=5)
self.assertRaises(SenderUnknownOptArgs, ai.check, argsExtraOpt)
args = dict(arg1=1, arg2=2)
ai.check(args)
# filter:
msgArgs = dict(arg1=1, arg2=2)
argsOK = msgArgs.copy()
assert ai.filterArgs( msgArgs ) == argsOK
msgArgs.update(arg3=3, arg4=4)
assert ai.filterArgs( msgArgs ) == argsOK
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,168 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub.core.topicobj import Topic
from wx.lib.pubsub.core.treeconfig import TreeConfig
from wx.lib.pubsub.core.topicutils import ALL_TOPICS
from wx.lib.pubsub.core.topicargspec import ArgsInfo, ArgSpecGiven
from wx.lib.pubsub.core.listener import ListenerInadequate
from wx.lib.pubsub.core.topicexc import ListenerSpecInvalid
#---------------------------------------------------------------------------
class lib_pubsub_Topic(wtc.WidgetTestCase):
rootTopic = None
treeConfig = TreeConfig()
def test0_CreateRoot(self):
#
# Test create and then modify state of a topic object
#
nameTuple = ('root',)
description = 'root description'
msgArgsInfo = None
# when parent is None, only nameTuple=ALL_TOPICS is allowed, thereby
# guaranteeing that only one tree root can be created
self.assertRaises(ValueError, Topic, self.treeConfig, nameTuple, description, msgArgsInfo)
# create the ALL TOPICS topic; it has no message args
nameTuple = (ALL_TOPICS,)
argSpec = ArgSpecGiven(dict() )
msgArgsInfo = ArgsInfo(nameTuple, argSpec, None)
obj = Topic(self.treeConfig, nameTuple, description, msgArgsInfo)
# verify its state is as expected after creation:
assert obj.getListeners() == []
assert obj.getNumListeners() == 0
assert obj.hasListeners() == False
def listener1():
pass
def listener2():
pass
def badListener1(arg1):
pass # extra required arg
def badListener2(arg1=None):
pass # extra is optional
assert obj.isValid(listener1)
assert not obj.isValid(badListener1)
assert not obj.isValid(badListener2)
self.rootTopic = obj
def test1_SubUnsub(self):
#
# Test subscription and unsubscription of listeners
#
def listener1():
pass
def listener2():
pass
# need to run this here again to get rootTopic setup for this test
self.test0_CreateRoot()
obj = self.rootTopic
# now modify its state by subscribing listeners
obj.subscribe(listener1)
obj.subscribe(listener2)
obj.hasListener(listener1)
obj.hasListener(listener2)
assert obj.hasListeners() == True
assert set(obj.getListeners()) == set([listener1, listener2])
assert obj.getNumListeners() == 2
# try to subscribe an invalid listener
def badListener(arg1):
pass # extra required arg
self.assertRaises(ListenerInadequate, obj.subscribe, badListener)
# try unsubscribe
obj.unsubscribe(listener1)
assert obj.hasListeners() == True
assert obj.getListeners() == [listener2]
assert obj.getNumListeners() == 1
# try unsubscribe all, with filtering
obj.subscribe(listener1)
def listener3(): pass
obj.subscribe(listener3)
assert obj.getNumListeners() == 3
def ff(listener):
# use != since it is defined in terms of ==; also, put listener
# on RHS to verify works even when Listener used on RHS
return listener2 != listener
obj.unsubscribeAllListeners(filter=ff)
assert obj.getNumListeners() == 1
assert obj.getListeners() == [listener2]
obj.subscribe(listener1)
obj.subscribe(listener3)
assert obj.getNumListeners() == 3
obj.unsubscribeAllListeners()
assert obj.getNumListeners() == 0
def test2_CreateChild(self):
#
# Test creation of a child topic, subscription of listeners
#
# need to run this here again to get rootTopic setup for this test
self.test0_CreateRoot()
nameTuple = ('childOfAll',)
description = 'child description'
argsDocs = dict(arg1='arg1 desc', arg2='arg2 desc')
reqdArgs = ('arg2',)
argSpec = ArgSpecGiven(argsDocs=argsDocs, reqdArgs = reqdArgs)
msgArgsInfo = ArgsInfo(nameTuple, argSpec, self.rootTopic._getListenerSpec())
parent = Topic(self.treeConfig, nameTuple, description, msgArgsInfo,
parent=self.rootTopic)
assert parent.getParent() is self.rootTopic
# now create a child of child with wrong arguments so we can test exceptions
nameTuple = ('childOfAll', 'grandChild')
description = 'grandchild description'
def tryCreate(ad, r):
argSpec = ArgSpecGiven(argsDocs=ad, reqdArgs = r)
msgArgsInfo = ArgsInfo(nameTuple, argSpec, parent._getListenerSpec())
obj = Topic(self.treeConfig, nameTuple, description, msgArgsInfo,
parent=parent)
# test when all OK
argsDocs = dict(arg1='arg1 desc', arg2='arg2 desc')
reqdArgs = ('arg2',)
tryCreate(argsDocs, reqdArgs)
# test when requiredArg wrong
reqdArgs = ('arg3',)
self.assertRaises(ListenerSpecInvalid, tryCreate, argsDocs, reqdArgs)
reqdArgs = ()
self.assertRaises(ListenerSpecInvalid, tryCreate, argsDocs, reqdArgs)
# test when missing opt arg
argsDocs = dict(arg1='arg1 desc', arg2='arg2 desc')
reqdArgs = ('arg2',)
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,426 @@
"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub import pub
from wx.lib.pubsub.pub import \
ALL_TOPICS, \
ListenerSpecInvalid, \
ITopicDefnProvider, \
TopicTreeTraverser, \
UndefinedTopic, \
UndefinedSubtopic, \
ListenerNotValidatable
from wx.lib.pubsub.core.topicmgr import \
ArgSpecGiven
from wx.lib.pubsub.core.topicutils import \
TopicNameInvalid, \
validateName
topicMgr = pub.getDefaultTopicMgr()
from wx.lib.pubsub.utils.topictreeprinter import \
printTreeDocs, ITopicTreeVisitor
class lib_pubsub_TopicMgr0_Basic(wtc.WidgetTestCase):
"""
Only tests TopicMgr methods. This must use some query methods on
topic objects to validate that TopicMgr did it's job properly.
"""
def failTopicName(self, name):
self.assertRaises(TopicNameInvalid, validateName, name)
def test10_GoodTopicNames(self):
#
# Test that valid topic names are accepted by pubsub'
#
validateName('test.asdf')
validateName('test.a')
validateName('test.a.b')
def test10_BadTopicNames(self):
#
# Test that invalid topic names are rejected by pubsub
#
# parts of topic name are 'empty'
self.failTopicName( '' )
self.failTopicName( ('',) )
self.failTopicName( ('test','asdf','') )
self.failTopicName( ('test','a', None) )
# parts of topic name have invalid char
self.failTopicName( ('test','a','b','_') )
self.failTopicName( ('(aa',) )
self.failTopicName( (ALL_TOPICS,) )
class lib_pubsub_TopicMgr1_GetOrCreate_NoDefnProv(wtc.WidgetTestCase):
"""
Only tests TopicMgr methods. This must use some query methods on
topic objects to validate that TopicMgr did it's job properly.
"""
def test10_NoProtoListener(self):
#
# Test the getOrCreateTopic without proto listener
#
def verifyNonSendable(topicObj, nameTuple, parent):
"""Any non-sendable topic will satisfy these conditions:"""
self.assertEqual(0, topicMgr.isTopicSpecified(nameTuple))
assert not topicObj.isSendable()
assert topicObj.getListeners() == []
assert topicObj.getNameTuple() == nameTuple
assert topicObj.getNumListeners() == 0
assert topicObj.getParent() is parent
assert topicObj.getNodeName() == topicObj.getNameTuple()[-1]
def foobar():
pass
assert not topicObj.hasListener(foobar)
assert not topicObj.hasListeners()
assert not topicObj.hasSubtopic('asdfafs')
assert not topicObj.isAll()
self.assertRaises(ListenerNotValidatable, topicObj.isValid, foobar)
self.assertRaises(ListenerNotValidatable, topicObj.validate, foobar)
# check that getTopic and getOrCreateTopic won't create again:
assert topicMgr.getOrCreateTopic(nameTuple) is topicObj
assert topicMgr.getTopic(nameTuple) is topicObj
# test with a root topic
rootName = 'GetOrCreate_NoProtoListener'
tName = rootName
# verify doesn't exist yet
assert topicMgr.getTopic(tName, True) is None
# ok create it, unsendable
rootTopic = topicMgr.getOrCreateTopic(tName)
verifyNonSendable(rootTopic, (rootName,), topicMgr.getRootTopic())
DESC_NO_SPEC = 'UNDOCUMENTED: created without spec'
assert rootTopic.getDescription() == DESC_NO_SPEC
assert rootTopic.isRoot()
assert rootTopic.getSubtopics() == []
assert not rootTopic.isAll()
assert not rootTopic.hasSubtopic()
# test with a subtopic
tName1 = (rootName, 'stB')
tName2 = tName1 + ('sstC',)
assert topicMgr.getTopic(tName1, True) is None
assert topicMgr.getTopic(tName2, True) is None
subsubTopic = topicMgr.getOrCreateTopic(tName2)
# verify that parent was created implicitly
subTopic = topicMgr.getTopic(tName1)
verifyNonSendable(subTopic, tName1, rootTopic)
verifyNonSendable(subsubTopic, tName2, subTopic)
assert subsubTopic.getDescription() == DESC_NO_SPEC
DESC_PARENT_NO_SPEC = 'UNDOCUMENTED: created as parent without specification'
assert subTopic.getDescription() == DESC_PARENT_NO_SPEC
assert rootTopic.getSubtopics() == [subTopic]
assert rootTopic.hasSubtopic()
assert subTopic.getSubtopics() == [subsubTopic]
assert subTopic.hasSubtopic()
assert subsubTopic.getSubtopics() == []
assert not subsubTopic.hasSubtopic()
# check that getTopic raises expected exception when undefined topic:
tName = 'Undefined'
self.assertRaises(UndefinedTopic, topicMgr.getTopic, tName)
tName = rootName + '.Undefined'
self.assertRaises(UndefinedSubtopic, topicMgr.getTopic, tName)
def test20_WithProtoListener(self):
#
# Test the getOrCreateTopic with proto listener
#
rootName = 'GetOrCreate_WithProtoListener'
tName = rootName
# verify doesn't exist yet
assert topicMgr.getTopic(tName, True) is None
def protoListener(arg1, arg2=None):
pass
# ok create it, sendable
rootTopic = topicMgr.getOrCreateTopic(tName, protoListener)
# check that getTopic and getOrCreateTopic won't create again:
assert topicMgr.getOrCreateTopic(tName) is rootTopic
assert topicMgr.getTopic(tName) is rootTopic
assert rootTopic.isSendable()
assert topicMgr.isTopicSpecified(tName)
expectDesc = 'UNDOCUMENTED: created from protoListener "protoListener" in module test_lib_pubsub_topicmgr'
assert rootTopic.getDescription() == expectDesc
# check that topic created can discern between good and bad listener
assert rootTopic.isValid(protoListener)
def badListener1():
pass # missing required arg
def badListener2(arg2):
pass # opt arg is required
def badListener3(arg1, arg3):
pass # extra required arg
assert not rootTopic.isValid(badListener1)
assert not rootTopic.isValid(badListener2)
assert not rootTopic.isValid(badListener3)
# verify that missing parent created is not sendable, child is
def protoListener2(arg1, arg2=None):
pass
tName = (tName, 'stA', 'sstB')
subsubTopic = topicMgr.getOrCreateTopic(tName, protoListener2)
subTopic = topicMgr.getTopic( tName[:-1] )
assert not topicMgr.isTopicSpecified( tName[:-1] )
assert topicMgr.isTopicSpecified( tName )
assert subsubTopic.isValid(protoListener2)
class lib_pubsub_TopicMgr2_GetOrCreate_DefnProv(wtc.WidgetTestCase):
"""
Test TopicManager when one or more definition providers
can provide for some topic definitions.
"""
def test10_DefnProvider(self):
#
# Test the addition and clearing of definition providers
#
class DefnProvider:
pass
dp1 = DefnProvider()
dp2 = DefnProvider()
assert 1 == topicMgr.addDefnProvider(dp1)
assert 1 == topicMgr.addDefnProvider(dp1)
assert 2 == topicMgr.addDefnProvider(dp2)
assert 2 == topicMgr.addDefnProvider(dp2)
assert 2 == topicMgr.addDefnProvider(dp1)
topicMgr.clearDefnProviders()
assert 0 == topicMgr.getNumDefnProviders()
assert 1 == topicMgr.addDefnProvider(dp1)
topicMgr.clearDefnProviders()
def test20_UseProvider(self):
#
# Test the use of definition providers for topics. We create
# two so we can check that more than one can work together.
# One provides good definitions, one provides some with errors.
#
class DefnProvider(ITopicDefnProvider):
"""
Provide definitions for a root topic, subtopic, and
one subtopic whose parent is not defined here. It is easier
to use sub-only definitions.
"""
def __init__(self):
self.defns = {
('a',) : (dict(arg1='arg1 desc', arg2='arg2 desc'),
('arg1',) ),
('a', 'b') : (dict(arg1='arg1 desc', arg2='arg2 desc',
arg3='arg3 desc', arg4='arg2 desc'),
('arg1', 'arg3',) ),
# parent doesn't have defn
('a', 'c', 'd') : (
dict(arg1='arg1 desc', arg2='arg2 desc',
arg3='arg3 desc', arg4='arg4 desc',
arg5='arg5 desc', arg6='arg6 desc'),
('arg1', 'arg3', 'arg5',)),
}
def getDefn(self, topicNameTuple):
if topicNameTuple not in self.defns:
return None, None
defn = ArgSpecGiven()
defn.setAll( * self.defns[topicNameTuple] )
desc = '%s desc' % '.'.join(topicNameTuple)
return desc, defn
class DefnProviderErr(ITopicDefnProvider):
"""
Provide some definitions that have wrong arg spec. It is
easier to use the 'all-spec' for definitions, which provides
an opportunity for a different method of ArgSpecGiven.
"""
def __init__(self):
self.defns = {
('a', 'err1') : (# missing arg2
dict(arg1=''),
('arg1',) ),
('a', 'err2') : (# missing arg1
dict(arg2=''), ),
('a', 'err3') : (# arg1 is no longer required
dict(arg1='', arg2=''), ),
}
def getDefn(self, topicNameTuple):
if topicNameTuple not in self.defns:
return None, None
defn = ArgSpecGiven()
defn.setAll( * self.defns[topicNameTuple] )
desc = '%s desc' % '.'.join(topicNameTuple)
return desc, defn
topicMgr.addDefnProvider( DefnProvider() )
topicMgr.addDefnProvider( DefnProviderErr() )
# create some topics that will use defn provider
topic = topicMgr.getOrCreateTopic('a')
assert topic.getDescription() == 'a desc'
assert topic.isSendable()
topic = topicMgr.getOrCreateTopic('a.b')
assert topic.getDescription() == 'a.b desc'
assert topic.isSendable()
topic = topicMgr.getOrCreateTopic('a.c.d')
assert topic.getDescription() == 'a.c.d desc'
assert topic.isSendable()
assert not topicMgr.isTopicSpecified('a.c')
# check
parent = topicMgr.getTopic('a.c')
assert not parent.isSendable()
def protoListener(arg1, arg3, arg2=None, arg4=None): pass
parent = topicMgr.getOrCreateTopic('a.c', protoListener)
assert parent.isSendable()
assert topic.isSendable()
# now the erroneous ones:
def testRaises(topicName, expectMsg):
self.assertRaises(ListenerSpecInvalid, topicMgr.getOrCreateTopic,
topicName)
try:
assert topicMgr.getOrCreateTopic(topicName) is None
except ListenerSpecInvalid, exc:
# ok, did raise but is it correct message?
try:
str(exc).index(expectMsg)
except ValueError:
msg = 'Wrong message, expected \n "%s", got \n "%s"'
raise RuntimeError(msg % (expectMsg, str(exc)) )
testRaises('a.err1', 'Params [arg1] missing inherited [arg2] for topic "a.err1"')
testRaises('a.err2', 'Params [arg2] missing inherited [arg1] for topic "a.err2"')
testRaises('a.err3', 'Params [] missing inherited [arg1] for topic "a.err3" required args')
def test30_DelTopic(self):
#
# Test topic deletion
#
topicMgr.getOrCreateTopic('delTopic.b.c.d.e')
assert topicMgr.getTopic('delTopic.b.c.d.e') is not None
assert topicMgr.getTopic('delTopic.b.c.d').hasSubtopic('e')
assert topicMgr.getTopic('delTopic.b').hasSubtopic('c')
topicMgr.delTopic('delTopic.b.c')
assert not topicMgr.getTopic('delTopic.b').hasSubtopic('c')
assert topicMgr.getTopic('delTopic.b.c.d.e', okIfNone=True) is None
assert topicMgr.getTopic('delTopic.b.c.d', okIfNone=True) is None
assert topicMgr.getTopic('delTopic.b.c', okIfNone=True) is None
class lib_pubsub_TopicMgr3_TreeTraverser(wtc.WidgetTestCase):
expectedOutput = '''\
\-- Topic "a2"
\-- Topic "a"
\-- Topic "a"
\-- Topic "b"
\-- Topic "b"
\-- Topic "a"
\-- Topic "b"'''
def test1(self):
#
# Test printing of topic tree
#
root = topicMgr.getOrCreateTopic('a2')
topicMgr.getOrCreateTopic('a2.a.a')
topicMgr.getOrCreateTopic('a2.a.b')
topicMgr.getOrCreateTopic('a2.b.a')
topicMgr.getOrCreateTopic('a2.b.b')
from StringIO import StringIO
buffer = StringIO()
printTreeDocs(rootTopic=root, width=70, fileObj=buffer)
self.assertEqual( buffer.getvalue(), self.expectedOutput )
def test2(self):
#
# Test traversing with and without filtering, breadth and depth
#
class MyTraverser(ITopicTreeVisitor):
def __init__(self):
self.traverser = pub.TopicTreeTraverser(self)
self.calls = ''
self.topics = []
def traverse(self, rootTopic, **kwargs):
self.traverser.traverse(rootTopic, **kwargs)
def __append(self, val):
self.calls = self.calls + str(val)
def _startTraversal(self):
self.__append(1)
def _accept(self, topicObj):
self.__append(2)
# only accept topics at root or second level tree, or if tailName() is 'A'
return len(topicObj.getNameTuple()) <= 2 or topicObj.getNodeName() == 'A'
def _onTopic(self, topicObj):
self.__append(3)
self.topics.append(topicObj.getNodeName())
def _startChildren(self):
self.__append(4)
def _endChildren(self):
self.__append(5)
def _doneTraversal(self):
self.__append(6)
root = topicMgr.getOrCreateTopic('traversal')
topicMgr.getOrCreateTopic('traversal.a.A')
topicMgr.getOrCreateTopic('traversal.a.B.foo')
topicMgr.getOrCreateTopic('traversal.b.C')
topicMgr.getOrCreateTopic('traversal.b.D.bar')
def exe(expectCalls, expectTopics, **kwargs):
traverser = MyTraverser()
traverser.traverse(root, **kwargs)
self.assertEqual(traverser.topics, expectTopics)
self.assertEqual(traverser.calls, expectCalls)
exe(expectCalls = '13434345343455534345343455556',
expectTopics = ['traversal', 'a', 'A', 'B', 'foo', 'b', 'C', 'D', 'bar'],
onlyFiltered = False)
exe(expectCalls = '13433543354335454354543545456',
expectTopics = ['traversal', 'a', 'b', 'A', 'B', 'C', 'D', 'foo', 'bar'],
how = TopicTreeTraverser.BREADTH, onlyFiltered = False)
exe(expectCalls = '123423423452523422556',
expectTopics = ['traversal','a','A','b'])
exe(expectCalls = '123423235423254225456',
expectTopics = ['traversal','a','b','A'],
how = TopicTreeTraverser.BREADTH)
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()