付録F:プッシュ受信のhttp/Websocketの例
1. httpserverの例
#!/usr/bin/env python3
import tornado.ioloop
import tornado.web
import pprint
import sys
import os
import time
import json
import logging
import socket
import datetime
file_handler = logging.FileHandler('/tmp/http_server.log', 'a', encoding='utf-8')
formatter = logging.Formatter("%(asctime)-15s %(message)s")
file_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
# the ip of httpserver
local_IP = socket.gethostbyname(socket.gethostname())
class MyDumpHandler(tornado.web.RequestHandler):
def post(self)
logger.info("=" * 100 + "\n")
logger.info(datetime.datetime.now())
logger.info("\n[BODY]")
try:
body = self.request.body.decode('utf', 'backslashreplace')
logger.info("try1:\n%s" % body)
# logger.info('BODY: \n' + body)
except BaseException as e:
logger.info("BaseException1 %s" % e)
try:
body = self.request.body.decode('utf-8')
logger.info("try2:\n%s" % body)
# logger.info('BODY: \n' + body)
except BaseException as e:
logger.info("BaseException2 %s" % e)
pass
os.makedirs('/tmp/images', exist_ok=True)
ip = self.request.remote_ip # the ip of nebula-m
file = self.request.files
rsp_json = self.request.body_arguments["json"][0].decode("utf-8")
# the push data field
js_dict = json.loads(rsp_json)
if js_dict["msg_id"] == 774:
response_774_dict = ["camera_name", "device_id", "channel", "position", "img_id", "img_path", "lib_name", "lib_type", "person_addr", "person_age", "person_gender", "person_idcard","person_name", "snap_id", "similarity", "quality", "snap_feat", "snap_path", "trigger"]
self.assert_resp_in_data(response_774_dict, js_dict["data"])
elif js_dict["msg_id"] == 775:
response_775_dict = ["device_id", "trigger"]
self.assert_resp_in_data(response_775_dict, js_dict["data"])
else:
print(js_dict["msg_id"])
# write the pushing picture
t = str(time.time()).replace('.', '')
if file:
logger.info(file['snap'][0]['filename'] + ' saved')
fobj = open('/tmp/images/' + ip + 'snap' + t + '.jpg', "wb")
fobj.write(file['snap'][0]['body'])
fobj.close()
logger.info(file['snap_frame'][0]['filename'] + ' saved')
fobj = open('/tmp/images/' + ip + 'snap_frame' + t + '.jpg', "wb")
fobj.write(file['snap_frame'][0]['body'])
fobj.close()
if 'img' in file:
logger.info(file['img'][0]['filename'] + ' saved')
fobj = open('/tmp/images/' + ip + 'image' + t + '.jpg', "wb")
fobj.write(file['img'][0]['body'])
fobj.close()
@staticmethod
def assert_resp_in_data(resp_dict, resp_data):
for item in resp_dict:
assert item in resp_data, print("lost item %s" %item)
if __name__ == "__main__":
try:
print("\nHttp server listen on https://" + local_IP +
":8880 now ... (/tmp/http_server.log, /tmp/images)")
tornado.web.Application([(r"/.*", MyDumpHandler), ]).listen(8080)
tornado.ioloop.IOLoop.instance().start()
except BaseException as e:
print(e)
pass
2. Websocketの例
#!/usr/bin/env python3
import time
import websocket
import ssl
import base64
import sys
import logging
IP = '10.5.2.81' # the ip of nebula-m
key = ' '
file_handler = logging.FileHandler(
'/tmp/ws.log', 'a', encoding='utf-8')
formatter = logging.Formatter("%(asctime)-15s %(message)s")
file_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
def on_message(ws, message):
logger.info(message)
d = json.loads(message)
print(d['msg'])
if 'data' in d:
# print(dir(d['data']))
# print(pretty_json(d['data']['img']))
t = str(time.time()).replace('.', '')
# print(json.dumps(d))
response_777_dict = ["camera_name", "device_id", "channel", "img_id", "img_path", "lib_name","lib_type", "person_addr", "person_age", "person_gender", "person_idcard",
"person_name", "position", "ranking", "similarity", "quality", "snap_id", "snap_buf", "snap_feat", "snap_path", "snap_frame", "trigger"]
assert_resp_in_data(response_777_dict, d['data'])
if d['data']['img'] != -1:
data = d['data']['img'].split(',', 1)
f = imgdata = base64.b64decode(data[1])
fobj = open('/tmp/wsimages/' + IP + 'snap' + t + '.jpg', "wb")
fobj.write(f)
fobj.close()
def assert_resp_in_data(resp_dict, resp_data):
for item in resp_dict:
assert item in resp_data, print(item)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
# Subscribe the push messages
def on_open(ws):
def run(*args):
ws.send('{"key":"%s", "msg_id":"776"}' % key)
print("thread terminating...")
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp('ws://' + IP + '/ws/',
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open)
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
Last updated