Tornado 有所謂『websocket』交握,其意為連線一次後就可維持通訊,因為我的資料庫想每更動 TEXT 就查詢一次,感覺用 websocket 較適合,否則使用 AsnycHTTPClient 每次訪問都要重新連線一次,Client 端和 Server 端程式如下 (下個範例是 Server 回傳 JSON 格式)
============================================================================
Client
============================================================================
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado import gen
from tornado.websocket import websocket_connect
class Client(object):
def __init__(self, url, timeout):
self.url = url
self.timeout = timeout
self.ioloop = IOLoop.instance()
self.ws = None
self.connect()
PeriodicCallback(self.keep_alive, 20000).start()
self.ioloop.start()
@gen.coroutine
def connect(self):
print ("trying to connect")
try:
self.ws = yield websocket_connect(self.url)
except Exception:
print ("connection error")
else:
print ("connected")
self.run()
@gen.coroutine
def run(self):
while True:
msg = yield self.ws.read_message()
if msg is None:
print ("connection closed")
self.ws = None
break
def keep_alive(self):
if self.ws is None:
self.connect()
else:
self.ws.write_message("keep alive")
if __name__ == "__main__":
client = Client("ws://10.0.0.240:3306", 5)
============================================================================
Server
============================================================================
import logging
import tornado.web
import tornado.websocket
import tornado.ioloop
import tornado.options
from tornado.options import define, options
define("port", default=3306, help="run on the given port", type=int)
class Application(tornado.web.Application):
def __init__(self):
handlers = [(r"/", MainHandler)]
settings = dict(debug=True)
tornado.web.Application.__init__(self, handlers, **settings)
class MainHandler(tornado.websocket.WebSocketHandler):
def check_origin(self, origin):
return True
def open(self):
logging.info("A client connected.")
def on_close(self):
logging.info("A client disconnected")
def on_message(self, message):
logging.info("message: {}".format(message))
def main():
tornado.options.parse_command_line()
app = Application()
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
沒有留言:
張貼留言