[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[circle] automated testing
From: |
malcolm handley |
Subject: |
[circle] automated testing |
Date: |
Thu, 24 Jun 2004 00:10:30 -0700 |
I decided to test circle in an isolated place before trying to embed it
in a larger program that currently uses freenet for distributing data
and wrote the test below.
I am thrilled with how easy it was to write both of these tests and how
short they are.
I have two questions, though:
1. Do you guys mind if I run code like this that makes nodes (which
will not use proxies, regardless of whether they are running on a
machine with NAT), connects them to the network and then destroys them
very soon after? Is that bad for the network overall?
2. The second test is a little hacky. I don't know how long I should
wait (the for "for x in range(1000)" part) for the publication of the
data to spread as far as it needs to. (In fact, I don't have much idea
how publication works.) Is there some way that I can determine how long
to wait? Is there something that I can read about how this works? I
couldn't find much on the web site.
Anyway, here the test is, in case anyone cares. I thought that you
might like to hear good news, rather than bugs, from me for once. To
run it place the whole thing in a file called nodetest.py in circlelib
and run "python nodetest.py -v".
>>>> begin nodetest.py
from __future__ import generators
import time
import unittest
import hash
import node
import proxy
import utility
class Test(unittest.TestCase):
def makeConnectedNodes(self, numNodes):
nodes = [node.Node() for x in range(numNodes)]
for n in nodes: n.start(proxy=None)
connected = 0
while not connected:
utility._daemon_action_timeout()
connected = 1
for n in nodes:
if not n.is_connected(): connected = 0
return nodes
def testRpc(self):
n0, n1 = self.makeConnectedNodes(numNodes=2)
class Handler:
def handle(self, request, address, callId):
return "rpc handled"
n0.add_handler(msg_name="nodetest", how=Handler())
finishedDict = {"finished":0}
def callMethod():
ticket, template, wait = n1.call(address=n0.address,
query=("nodetest",))
if wait: yield "call", (n1, ticket)
result = n1.get_reply(ticket,template)
self.assertEquals(result, "rpc handled")
finishedDict["finished"] = 1
utility.start_thread(callMethod())
while not finishedDict["finished"]:
utility._daemon_action_timeout()
def testData(self):
n0, n1 = self.makeConnectedNodes(numNodes=2)
publishedName = "testData-%f" % time.time()
publishedData = {"type":"test-data", "attr":"value"}
n0.publish(name=hash.hash_of(publishedName), info=publishedData)
for x in range(1000):
time.sleep(0.01)
utility._daemon_action_timeout()
pipe = n0.retrieve(hash.hash_of(publishedName))
foundAddress = None
try:
while not pipe.finished():
for address, data in pipe.read_all():
assert not foundAddress
self.assertEquals(data, publishedData)
foundAddress = address
utility._daemon_action_timeout()
time.sleep(1)
finally:
pipe.stop()
self.assertEquals(foundAddress, n0.address)
if __name__ == "__main__":
unittest.main()
<<<< end nodetest.py
- [circle] automated testing,
malcolm handley <=