付録F:プッシュ受信のhttp/Websocketの例

1. httpserverの例

#!/usr/bin/env python3

import tornado.ioloop
import tornado.web
import pprint
import sys
import os
import time
import json
import logging
import socket
import datetime

file_handler = logging.FileHandler('/tmp/http_server.log', 'a', encoding='utf-8')
formatter = logging.Formatter("%(asctime)-15s %(message)s")
file_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)

# the ip of httpserver
local_IP = socket.gethostbyname(socket.gethostname())

class MyDumpHandler(tornado.web.RequestHandler):

    def post(self)
        logger.info("=" * 100 + "\n")
        logger.info(datetime.datetime.now())
        logger.info("\n[BODY]")
        try:
            body = self.request.body.decode('utf', 'backslashreplace')
            logger.info("try1:\n%s" % body)
            # logger.info('BODY: \n' + body)
        except BaseException as e:
            logger.info("BaseException1 %s" % e)
            try:
                body = self.request.body.decode('utf-8')
                logger.info("try2:\n%s" % body)
                # logger.info('BODY: \n' + body)
            except BaseException as e:
                logger.info("BaseException2 %s" % e)
                pass

        os.makedirs('/tmp/images', exist_ok=True)
        ip = self.request.remote_ip  # the ip of nebula-m
        file = self.request.files
        rsp_json = self.request.body_arguments["json"][0].decode("utf-8")

        # the push data field
        js_dict = json.loads(rsp_json)
        if js_dict["msg_id"] == 774:
            response_774_dict = ["camera_name", "device_id", "channel", "position", "img_id", "img_path", "lib_name", "lib_type", "person_addr", "person_age", "person_gender", "person_idcard","person_name", "snap_id", "similarity", "quality", "snap_feat", "snap_path", "trigger"]
            self.assert_resp_in_data(response_774_dict, js_dict["data"])
        elif js_dict["msg_id"] == 775:
            response_775_dict = ["device_id", "trigger"]
            self.assert_resp_in_data(response_775_dict, js_dict["data"])
        else:
            print(js_dict["msg_id"])

        # write the pushing picture
        t = str(time.time()).replace('.', '')
        if file:
            logger.info(file['snap'][0]['filename'] + ' saved')
            fobj = open('/tmp/images/' + ip + 'snap' + t + '.jpg', "wb")
            fobj.write(file['snap'][0]['body'])
            fobj.close()

            logger.info(file['snap_frame'][0]['filename'] + ' saved')
            fobj = open('/tmp/images/' + ip + 'snap_frame' + t + '.jpg', "wb")
            fobj.write(file['snap_frame'][0]['body'])
            fobj.close()
        if 'img' in file:
            logger.info(file['img'][0]['filename'] + ' saved')
            fobj = open('/tmp/images/' + ip + 'image' + t + '.jpg', "wb")
            fobj.write(file['img'][0]['body'])
            fobj.close()

    @staticmethod
    def assert_resp_in_data(resp_dict, resp_data):
        for item in resp_dict:
            assert item in resp_data, print("lost item %s" %item)

if __name__ == "__main__":
    try:
        print("\nHttp server listen on https://" + local_IP +
              ":8880 now ... (/tmp/http_server.log, /tmp/images)")
        tornado.web.Application([(r"/.*", MyDumpHandler), ]).listen(8080)
        tornado.ioloop.IOLoop.instance().start()
    except BaseException as e:
        print(e)
        pass

2. Websocketの例

#!/usr/bin/env python3
import time
import websocket
import ssl
import base64
import sys
import logging
IP = '10.5.2.81'   # the ip of nebula-m

key = ' '

file_handler = logging.FileHandler(
    '/tmp/ws.log', 'a', encoding='utf-8')
formatter = logging.Formatter("%(asctime)-15s %(message)s")
file_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)

def on_message(ws, message):
    logger.info(message)
    d = json.loads(message)
    print(d['msg'])
    if 'data' in d:
        # print(dir(d['data']))
        # print(pretty_json(d['data']['img']))
        t = str(time.time()).replace('.', '')
        # print(json.dumps(d))

        response_777_dict = ["camera_name", "device_id", "channel", "img_id", "img_path", "lib_name","lib_type", "person_addr", "person_age", "person_gender", "person_idcard",
         "person_name", "position", "ranking", "similarity", "quality", "snap_id", "snap_buf", "snap_feat", "snap_path", "snap_frame", "trigger"]
        assert_resp_in_data(response_777_dict, d['data'])
        if d['data']['img'] != -1:
            data = d['data']['img'].split(',', 1)
            f = imgdata = base64.b64decode(data[1])
            fobj = open('/tmp/wsimages/' + IP + 'snap' + t + '.jpg', "wb")
            fobj.write(f)
            fobj.close()

def assert_resp_in_data(resp_dict, resp_data):
    for item in resp_dict:
        assert item in resp_data, print(item)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

# Subscribe the push messages
def on_open(ws):
    def run(*args):
        ws.send('{"key":"%s", "msg_id":"776"}' % key)
        print("thread terminating...")
    thread.start_new_thread(run, ())

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp('ws://' + IP + '/ws/',
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close,
                                on_open=on_open)
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

Last updated