forked from BobBuildTool/bob
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_cmd_build_builder_jobserver.py
41 lines (31 loc) · 1.1 KB
/
test_cmd_build_builder_jobserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from unittest import TestCase
from bob.cmds.build.builder import InternalJobServer, JobServerSemaphore
from bob.utils import runInEventLoop
import asyncio
class TestJobServer:
def __init__(self):
self.__jobServer = InternalJobServer (2)
self.__jobServerSem = JobServerSemaphore(self.__jobServer.getMakeFd(), False)
async def testAcquire (self, cnt = 1):
while cnt != 0:
await self.__jobServerSem.acquire()
cnt -= 1
def release(self):
self.__jobServerSem.release()
# Migrate to unittest.IsolatedAsyncioTestCase starting with Python 3.8
class TestJobserverTest (TestCase):
def testAcquire(self):
runInEventLoop(self._testAcquire())
async def _testAcquire(self):
t = TestJobServer()
await t.testAcquire()
t.release()
await t.testAcquire(2)
t.release()
t.release()
def testReleaseToOften(self):
runInEventLoop(self._testReleaseToOften())
async def _testReleaseToOften(self):
t = TestJobServer()
with self.assertRaises(ValueError):
t.release()