-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathstreamer_functional_tests.py
157 lines (126 loc) · 6.72 KB
/
streamer_functional_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import unittest
from gaetestbed import FunctionalTestCase
import pshb
import settings
import streamer
# Set the environment so that tests which require admin privileges, and thus look up the user's email address, will pass
# See: http://code.google.com/p/nose-gae/issues/detail?id=13 for more
os.environ['AUTH_DOMAIN'] = 'example.org'
os.environ['USER_EMAIL'] = '[email protected]'
os.environ['USER_ID'] = 'example' # Our test user is logged in
settings.OPEN_ACCESS = True #Make the Admin pages visible to all
class BaseSubscriptionHandlerTest(FunctionalTestCase, unittest.TestCase):
def assertOKAfterRedirect(self, response, expectedString=None):
# Response should be a redirect that takes the user to a sensible page
self.assertRedirects(response)
response = response.follow()
self.assertOK(response)
if expectedString:
response.mustcontain(expectedString)
class SubscriptionsHandlerTest(BaseSubscriptionHandlerTest):
APPLICATION = streamer.application
def testCanShowSubscriptions(self):
response = self.get('/subscriptions')
self.assertOK(response)
response.mustcontain("<title>Subscriptions</title>")
def testCanDetectEmptySubscription(self):
response = self.post('/subscriptions', data={}, expect_errors=True)
self.assertEquals('500 Internal Server Error', response.status)
def testCanAddNewSubscriptionUsingTaskQueue(self):
data = {'function':'handleNewSubscription', 'url':'http://blog.oshineye.com/feeds/posts/default', 'nickname':'ade'}
response = self.post('/bgtasks', data=data, expect_errors=True)
self.assertEqual(1, streamer.Subscription.all().count())
def testAddingNewSubscriptionsUsingTaskQueueIsIdempotent(self):
data = {'function':'handleNewSubscription', 'url':'http://blog.oshineye.com/feeds/posts/default', 'nickname':'ade'}
self.assertEqual(streamer.Subscription.all().count(), 0)
response = self.post('/bgtasks', data=data, expect_errors=True)
self.assertEqual(streamer.Subscription.all().count(), 1)
response = self.post('/bgtasks', data=data, expect_errors=True)
self.assertEqual(streamer.Subscription.all().count(), 1)
def testAddingNoneExistentFeedsDoesNotRaiseAnException(self):
data = {'function':'handleNewSubscription', 'url':'http://www.oshineye.com/404FromStreamer', 'nickname':'ade'}
response = self.post('/bgtasks', data=data, expect_errors=True)
self.assertEquals('200 OK', response.status)
def testDeletingFeedDoesNotRaiseAnException(self):
url = "http://example.org/atom"
f = streamer.Subscription(url=url, hub="http://hub.example.org/", sourceUrl="http://example.org/", key_name=url)
f.put()
data = {'function':'handleDeleteSubscription', 'url': url, 'nickname':'ade'}
response = self.post('/bgtasks', data=data, expect_errors=True)
self.assertEquals('200 OK', response.status)
def testEnqueuesTaskForNewSubscription(self):
data = {'url':'http://blog.oshineye.com/feeds/posts/default'}
self.assertTasksInQueue(0)
response = self.post('/subscriptions', data=data)
self.assertTasksInQueue(1)
self.assertOKAfterRedirect(response, "<title>Subscriptions</title>")
class PostsHandlerTest(FunctionalTestCase, unittest.TestCase):
APPLICATION = streamer.application
def testCanShowPosts(self):
response = self.get('/posts')
self.assertOK(response)
response.mustcontain("<title>Posts</title>")
def testGoingToSlashIsTheSameAsPostsPage(self):
responseFromPostsPage = self.get('/posts')
responseFromSlash = self.get('/')
self.assertEquals(responseFromPostsPage.body, responseFromSlash.body)
self.assertEquals(responseFromPostsPage.status, responseFromSlash.status)
def testCanAcceptHubChallengeForSubscriptionToExistingFeed(self):
url = "http://example.org/atom"
hub="http://hub.example.org/"
s = streamer.Subscription(url=url, hub=hub, sourceUrl="http://example.org/", key_name=url)
s.put()
challenge = 'some hub challenge message'
response = self.get('/posts?hub.mode=subscribe&hub.topic=%s&hub.challenge=%s' % (url, challenge))
self.assertOK(response)
response.mustcontain(challenge)
def testAcceptsHubChallengeForUnsubscriptionToDeletedFeed(self):
# If the hub wants us to unsubscribe and we don't have the subscription then we should accept it
url = "http://example.org/atom"
challenge = 'some hub challenge message'
response = self.get('/posts?hub.mode=unsubscribe&hub.topic=%s&hub.challenge=%s' % (url, challenge))
self.assertOK(response)
response.mustcontain(challenge)
def testRejectsHubChallengeForUnsubscriptionToExistingFeed(self):
# If the hub wants us to unsubscribe and we have the subscription then the hub is probably confused
# so we honour the intentions of our users since they'll think we're still subscribed
url = "http://example.org/atom"
hub="http://hub.example.org/"
s = streamer.Subscription(url=url, hub=hub, sourceUrl="http://example.org/", key_name=url)
s.put()
challenge = 'some hub challenge message'
response = self.get('/posts?hub.mode=unsubscribe&hub.topic=%s&hub.challenge=%s' % (url, challenge), expect_errors=True)
self.assertEquals('404 Not Found', response.status)
response.mustcontain("Challenge failed for feed: %s with mode: %s" % (url, 'unsubscribe'))
class AboutHandlerTest(FunctionalTestCase, unittest.TestCase):
APPLICATION = streamer.application
def testCanShowAboutPage(self):
response = self.get('/about')
self.assertOK(response)
response.mustcontain("<title>About</title>")
class AdminRefreshSubscriptionsHandlerTest(BaseSubscriptionHandlerTest):
APPLICATION = streamer.application
def testCanShowRefreshSubscriptionsPage(self):
response = self.get('/admin/refreshSubscriptions')
self.assertOKAfterRedirect(response, '<title>Subscriptions</title>')
def testEnqueuesTaskPerSubscription(self):
self.assertTasksInQueue(0)
for i in range(5):
url = "http://example.org/atom" + str(i)
f = streamer.Subscription(url=url, hub="http://hub.example.org/", sourceUrl="http://example.org/", key_name=url)
f.put()
self.get('/admin/refreshSubscriptions')
self.assertTasksInQueue(streamer.Subscription.all().count())
class AdminAddSubscriptionHandlerTest(FunctionalTestCase, unittest.TestCase):
APPLICATION = streamer.application
def testCanShowAddSubscriptionPage(self):
response = self.get('/admin/addSubscription')
self.assertOK(response)
response.mustcontain("<title>Add Subscription</title>")
class AdminDeleteSubscriptionHandlerTest(FunctionalTestCase, unittest.TestCase):
APPLICATION = streamer.application
def testCanShowDeleteSubscriptionPage(self):
response = self.get('/admin/deleteSubscription')
self.assertOK(response)
response.mustcontain("<title>Delete Subscription</title>")