Files
TechDoc-BC2024/东北大学秦皇岛分校_白塔岭143队_百度智慧交通.md

18 KiB
Raw Blame History

百度智慧交通组技术报告

概述

在本次比赛中,主要任务仍可归纳为“循线 + 定点任务”相结合的方式,大体上和往年相似。主要难度在于循线过程中存在直角弯、三岔路口、十字路口等特殊元素,并且任务点附近没有地标作为任务定位标识,同时任务动作对定位准确性要求较高。

出于节省学校经费的考虑,我们没有选择直接购买官方车模,而是仅购买了指定的电机,单独购置了符合要求的上位机板卡,自制了车模结构和控制电路部分。

下位机设计

自定义通信协议

由于购买的 Nvidia 官方以及仿制官方的 Jetson Orin Nano 载板(以下简称 “nano”并没有直接引出 CAN 接口,并且对于 Linux 下 CAN 设备的调试开发也缺乏经验,所以选择了使用 nano 的 40pin 拓展接口中的串口设备与下位机主控板进行通信,对于其他模块的指令在下位机主控接收后转换成对应的 can 数据帧下发给其他模块,通信结构如下图所示。

控制器架构

在 nano 下发的指令中,需要控制车辆运动和执行机构动作,由于存在一些对参数“增量”修改的命令(如前进指定距离),为了避免因为通信出错导致类似指令漏发、多发,所以在通信机制中实现了校验和应答的机制。实现的自定义指令为定长帧,帧格式如下所示。

| 帧头 (1 byte) | 指令码 (1 byte) | 数据段 (8 byte) | CRC 校验值 (2 byte) |
| ------------ | -------------- | -------------- | ------------------ |
| 0xEB         | 0x30 - 0x6F    | 0x00 - 0xFF    | 0x00 - 0xFF        |

下位机主控设计

下位机主控

下位机主控采用了 AT32F403A 芯片作为主控,使用 配套的 AT32-WorkBench 工具能够非常快速的生成工程,加快开发速度。在功能上,下位机主控主要作为 nano 板卡和其他控制器模块的通信中继(将串口指令转发到 can 总线),并且还具备数字量输出(灯条和蜂鸣器控制)和电源分配功能。

25GA 电机控制板设计

25GA 电机控制板

25GA 电机为官方指定的电机型号,该电机为直流有刷电机,具有减速箱和光栅编码器。

为了实现电机的速度控制,搭建了 H 桥控制电路。考虑到车辆自重可能较大,电机可能长时间工作在较大负载下,故没有使用诸如 tb67f450 的集成 H 桥芯片,而是使用 NMOS 和预驱自行搭建,这样可以获得较低的内阻和较好的散热性能。

主控芯片采用 AT32F413CBT7主要考虑其具有 CAN 接口和充足的定时器等外设。

程序上,仅使用单速度环进行控制,采用简单的 PID 控制方式。由于车辆较重,该系统响应较为滞后,所以在参数整定上使用较大的积分值和较小的微分值,简单调参可以获得较好的控制效果。但仍然存在启动时负载较大,速度曲线上升迟缓的问题。经观察,大负载启动时,速度环输出上升速度足够快,基本上处于上限状态,故认为使用电流环作为内环并不能解决该问题。

输出上,使用互补 PWM虽然会造成一定的热损耗但是在一定程度上增加了响应性能。

此外,该电机存在编码器线数过低,在低速时不容易稳定控制的情况。在不改动编码器硬件的情况下,软件上配置 TIM 模块编码器模式为双边触发,对编码器信号进行四倍频。经观察,能够一定程度上提高低速状态下的控制精度。

z 轴蜗杆电机控制板设计

z 轴蜗杆电机控制板

z 轴使用带霍尔编码器的蜗杆电机,该电机减速机具有自锁特性,在位停止时仅需较小的保持力矩。该控制板硬件设计思路基本和 25GA 电机控制板一致,仅对于电源和主控进行了修改。主控更换了 AT32F425CBT7由于互补输出方式在输出为 0 时仍存在等占空比的互补输出,噪声较大,而对于该电机响应性能要求不高,所以更换为受限单极模式输出。 相似的,为了提高电机控制精度,也设置了编码器双边触发模式,对编码器信号进行四倍频。

x 轴步进电机控制板设计

x 轴步进电机控制板

由于 x 轴负载较小,为了提高机构运行速度和控制精度,使用步进电机驱动,齿轮齿条传动。为了简化开发流程,直接使用了 DRV8825 芯片(模块)对步进电机进行开环驱动,而为了保证位置的准确,又在步进电机轴体上粘贴了径向充磁的磁铁,利用次编码器检测转子位置。

程序上,提供了距离和绝对位置指令,上位机可设置使机构以指定速度、方向移动指定距离,或者以指定速度移动到设置位置。上电后先运行到限位位置校准零点。

6 路舵机控制板设计

6 路舵机控制板

电源扩展板设计

结构设计

机架结构设计

x 轴机构设计

z 轴机构设计

为了实现 z 轴快速运行并且可自锁的要求z 轴选用了带编码器的蜗杆减速电机使用同步带进行传动。电机固定在车辆底部z 轴机构固定在同步带上,使用压板在安装时张紧。

上位机程序设计

上位机环境配置概述

上位机程序主要基于 python 和 C/C++,为了保证和原有环境不产生冲突,使用 conda 创建了 python 的开发和运行环境。

在程序实际运行中,我们发现当推理开启时,一个模型的进程需要占用 2G 左右的内存,即使是在代码中开启了内存优化,并且指定大小,实际占用并没有明显区别。当开启多个推理进程时,会导致内存占满,使得某些进程被 kill 掉。由于规则限制最大只能使用 8G 内存的 nano 板卡,所以挂载了一个较大的 swap 分区,使用磁盘缓存保证不会出现物理内存占用过高的情况,实际测试下,未见明显的性能影响。

此外,为了保证开机自启的效果,我们构建了一个守护进程,使用 systemctl 管理我们的人机交互程序。

上位机程序总体设计

为了保证程序的高效运行,上位机程序总体采用“生产者-消费者”模型,所有进程由守护进程管理,首先由图像采集进程采集图像并储存在缓冲区中;推理进程从缓冲区获取图像,推理完成后将推理结果储存;控制线程需要推理结果时,向推理服务器请求暂存的推理结果即可,所有进程间请求均使用 zmq 的 “请求-应答” 模式。通过上述的结构模式,可以保证获取数据最新且请求速度较快。

执行机构接口程序设计

执行机构接口作为 python 模块导入控制进程的程序中,本体基于 C 编译成动态链接库,然后通过 cython 构建成 python 模块。

程序接收传入参数并封装成数据包,通过串口设备发送到下位机主控,然后接收下位机主控的响应,校验响应数据后返回。实现了应答超时和应答出错返回的机制。

图像采集服务器程序设计

图像采集服务器程序基于 C/C++ 编写,是一个独立的进程。该进程使用 opencv 读取摄像头图像每个摄像头对象构造一个采集和应答线程保证获取和响应的图像最新。使用了多语言支持的zeromq消息传递库建立一个请求-响应模型的 socket 服务器。相关代码如下:

cap = new cv::VideoCapture(index, cv::CAP_V4L2);
sleep(2);
cap->set(cv::CAP_PROP_FRAME_WIDTH, width);
cap->set(cv::CAP_PROP_FRAME_HEIGHT, height);
cap->set(cv::CAP_PROP_FPS, fps);

context = new zmq::context_t(1);
socket = new zmq::socket_t(*context, ZMQ_REP);
char zmq_bind_port[10] = {0};
sprintf(zmq_bind_port, "%d", port);
strcat(zmq_bind_addr, zmq_bind_port);
log_info("设置 %d zmq 地址 %s", index, zmq_bind_addr);
socket->bind(zmq_bind_addr);

推理服务器程序设计

在备赛过程中,我们注意到在小车通过岔路口时需要使用目标检测检测前方的转向牌从而确定下一步向哪一方向前进,同时巡线模型还需要不断请求前向摄像头以获取转向分量数据,为了解决这种不同的深度模型需要请求不同的图像源的问题,我们设计了推理服务器层。

以lane_infer巡线推理 server 为例,相关代码如下:

while True:
    camera_socket.send_string("")
    message = camera_socket.recv()
    np_array = np.frombuffer(message, dtype=np.uint8)
    frame = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    result = predictor.infer(frame)
    with lock:
        response['data'] = result
# 处理 server 响应数据
def server_resp(lane_infer_port):
    global response
    logger.info("lane server thread init success")
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(f"tcp://*:{lane_infer_port}")
    logger.info("lane infer server init success")
    while True:
        message = socket.recv_string()
        with lock:
            socket.send_pyobj(response)

对于yolo_infer目标检测server有一个切换摄像头源的过程相关代码如下

def server_resp(yolo_infer_port):
    logger.info("yolo server thread init success")
    global response
    global src_camera_id

    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(f"tcp://*:{yolo_infer_port}")
    logger.info("yolo infer server init success")
    while not exit_event.is_set():
        try:
            message = socket.recv_string()
            if message != '':
                with lock1:
                    logger.error(message)
                    src_camera_id = int(message)
                    logger.info("switch camera")
                    socket.send_pyobj(response) 
            else:
                with lock2:
                    socket.send_pyobj(response)
                    response['data'] = np.array([])
        except zmq.Again:
            time.sleep(0.01)

    socket.close()
    context.term()

while not exit_event.is_set():
    with lock1:
        try:
            if src_camera_id == 1:
                camera1_socket.send_string("")
                message = camera1_socket.recv()
            else:
                camera2_socket.send_string("")
                message = camera2_socket.recv()
            np_array = np.frombuffer(message, dtype=np.uint8)
            with lock3:
                frame = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
                start = True
        except:
            time.sleep(0.01)

模型推理,我们使用原生 PaddleInference以目标检测为例官方的 PaddleDection 套件带有许多冗余代码,降低执行效率,所以我们自己实现预处理-推理等过程。

import paddle.inference as paddle_infer
import numpy as np
import cv2
class Yolo_model_infer:
    def __init__(self, model_dir="./ppyoloe_plus_crn_t_auxhead_320_300e_coco", target_size=[320, 320]):
        # 初始化 paddle 推理
        self.model_dir = model_dir
        self.config = paddle_infer.Config(model_dir + "/model.pdmodel", model_dir + "/model.pdiparams")
        self.config.disable_glog_info()
        self.config.enable_use_gpu(500, 0)
        self.config.enable_memory_optim()
        self.config.switch_ir_optim()
        self.config.switch_use_feed_fetch_ops(False)
        self.predictor = paddle_infer.create_predictor(self.config)
        self.input_names = self.predictor.get_input_names()
        self.input_handle = self.predictor.get_input_handle(self.input_names[0])
        self.input_handle1 = self.predictor.get_input_handle(self.input_names[1])
        self.output_names = self.predictor.get_output_names()
        self.output_handle = self.predictor.get_output_handle(self.output_names[0])

        self.target_size = target_size
        origin_shape = (240, 320)
        resize_h, resize_w = self.target_size
        self.im_scale_y = resize_h / float(origin_shape[0])
        self.im_scale_x = resize_w / float(origin_shape[1])
        self.scale_info = np.array([[self.im_scale_y, self.im_scale_x]]).astype('float32')
    def infer(self,src) -> np.ndarray:
        image = self.preprocess(src)
        self.input_handle.copy_from_cpu(image)
        self.input_handle1.copy_from_cpu(self.scale_info)
        self.predictor.run()
        results = self.output_handle.copy_to_cpu()
        return results
    def preprocess(self,src):
        # resize
        # keep_ratio=0
        img = cv2.resize(
            src,
            None,
            None,
            fx=self.im_scale_x,
            fy=self.im_scale_y,
            interpolation=2)
        # NormalizeImage
        img = img.astype(np.float32, copy=False)
        scale = 1.0 / 255.0
        img *= scale
        # Permute
        img = img.transpose((2, 0, 1))
        img = np.array((img, ))
        # .astype('float32')
        return img
        

赛道线回归模型概述

官方采用的方案是采集摄像头第一视角+遥控器数据来实现深度巡线但实际上模型很难通过图像数据学习到遥控器数据的特征并且官方采用的模型结构过于简单只有简单的卷积层、池化层面对复杂的光线条件模型的泛化能力较差。我们的上位机选用Jetson Orin Nano内存和 GPU 推理能力都很强,因此不需要过于担心模型参数太大的问题,所以应当采用模型参数更大和结构更加先进的网络,面对不同的光线环境才能有较好的效果。

我们设计了一个网络,输入的图片经过特征提取层和全连接层后输出两个值:跟踪点的横纵坐标,回归出小车需要跟踪的赛道中点的 [x, y]坐标。因为中点坐标和图像数据有着强相关的特征,相比遥控器数据,更加容易被模型学习到这个特征。并且使用 paddlepaddle可以非常方便的使用很多性能更好的 backbone。

import paddle.vision.models as models
feature_extractor = models.resnet18(pretrained=True,num_classes=0)
feature_extractor = models.resnet34(pretrained=True,num_classes=0)
feature_extractor = models.mobilenet_v3_large(pretrained=True,num_classes=0)

我们使用性能更好的模型的特征提取层作为我们网络的 backbone并且通过 paddle 可以自动加载预训练参数,在训练网络时可以加快收敛。

数据集的制作过程比较复杂,首先需要实现遥控小车的功能,同时打开摄像头保存每一帧的图片。利用保存的图片和我们实现的打标软件标注赛道跟踪点。

导出的 json 格式如下所示[{"img_path": "200.jpg", "state": [126, 191]}, {"img_path": "201.jpg", "state": [128, 191]}]

经过多次尝试不同的特征提取层以及不同的超参数最后选择mobilenet_v3_large作为特征提取层在模型参数量和模型性能上取得最平衡的效果。

任务主程序设计

任务主程序将每个任务封装成对象,根据实际需求放入队列。程序执行时,任务对象依次出队。任务的执行按照 “搜寻-执行-后处理” 的方式运行,当执行某一任务时,先循环搜寻任务标志,待搜寻到可以进入任务的条件后,进入任务执行函数,函数内执行校准、夹持、伸展等任务。执行完成后,进入后处理阶段,该阶段可以设置下一任务的相关参数,并将执行机构运动到下一任务的预备位置。如果该任务不被开启,则执行一个任务不开启时才调用的函数,使执行机构运动到中立位置,保证下一任务开启时不会发生冲撞。

交互程序设计

为了满足日常调试的远程开启、关闭和日志实时查看以及正式比赛时的自动启动等需求,我们基于 python flask 、vue 和 element ui 开发了人机交互程序,功能包括推理服务器的日志查看、开启和关闭以及任务程序的日志查看、开启和关闭等功能。

推理服务器和任务程序的开启和关闭通过管理进程实现,相关代码如下:

elif data['type'] == 'operate_task':
    # 任务函数
    if data['content'] == 'run':
        task_run_flag.set()
        # 开启 task 进程前先关闭所有历史进程
        if task_process != None:
            task_process.terminate()
        time_record = time.perf_counter()
        task_process = Process(target=main_func, args=(task_run_flag,queue))
        task_process.start()
        logger.info("开启 task")
    elif data['content'] == 'stop':
        task_run_flag.clear()
        if task_process != None:
            task_process.terminate()
        logger.info(f"任务结束 用时{time.perf_counter() - time_record}s")
        logger.info("关闭  task")
    elif data['content'] == 'restart':
        pass

我们通过重定向loguru日志库并且通过 WebSocket 实现多人同时查看的实时日志系统,不同进程之间使用内存安全的共享队列传输日志信息,相关代码如下:

class Handler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        _queue.put({'level': record.levelname.lower(), 'content': log_entry})
logger.remove()
handler = Handler()
logger.add(handler, format="{time:MM-DD HH:mm:ss} {message}", level="DEBUG")
def thread_function():
    global queue
    while True:
        try:
            log = queue.get()
            socketio.emit('log', log)
        except multiprocessing.Queue.Empty:
            pass