youtube-dl 很紅但下指令好麻煩,做個下載機器人吧

前一陣子因為下架 youtube-dl 的事件鬧得很大,反而更多人知道這個神器(我也是其中一人)。但他是用 python 寫的 command line 程式,如果我在手機看到一段影片想要下載,還得找台電腦下指令下載,實在太麻煩了!於是,我想到用 line message api 做個簡單的下載 chatbot 來玩玩!這邊就分享我的開發過程。

需要使用以下程式或套件

  1. youtube-dl
  2. rclone
  3. flask
  4. celery
  5. redis
  6. line-chatbot-sdk
  7. docker

主要分享如何使用上述元件做出我的 chatbot 下載器,個別的介紹或教學就不在此篇文章討論。

先簡單描述一下整體流程架構:

  1. 想要透過 chatbot 接收我從 line 丟過去的影片連結,然後自動啟動下載
  2. 下載完成後會回傳影片的檔案名稱和大小到 line 對話
  3. 自動將下載完成的影片上傳到私人的 google drive
  4. 從 local 刪除剛剛下載的影片

個別的元素在過程中負責的任務:

  1. flask 作為 http server,提供 callback 給 line 接收使用者傳來的訊息,並將對應 task 透過 celery api 發送到 redis
  2. celery 作為 worker,監聽 redis 的訊息,收到影片網址後,透過 python 呼叫 youtube-dl 開始下載
  3. 下載完成,celery 透過 line mesage api 發送訊息給使用者, 並且繼續呼叫 rclone 將影片上傳到 google drive
  4. 上傳完成,發送訊息通知使用者,刪除檔案,結束

Line callback api

首先用 flask 寫一段 callback api 給 line 在收到使用者訊息時回扣使用。我習慣在 flask 這邊只簡單撰寫接收、檢查、回傳資料,邏輯處理的部份會另外放在 controller 裡面處理。

@line_message_webhook_api.route("/callback", methods=['POST'])
def receive_callback():
    signature = request.headers['X-Line-Signature']

    body = request.get_data(as_text=True)
    logging.debug(body)

    try:
        controller = LineCallbackController(body, signature)
        controller.handle()

    except InvalidSignatureError:
        logging.error("Invalid signature. Please check your channel access token/channel secret.")
        abort(400)

    return 'OK'

LineCallbackController

接下來看 LineCallbackController 的部分,可以看到這邊透過 line-chatbot-sdk,將收到的 json 解成物件的方式來處理比較方便。因為我們這次只要抓文字訊息,所以非文字訊息的部分都丟棄不處理。

為了未來還可以新增更多的指令功能,所以我將文字指令分析的程式碼又另外抽出來寫,因此可以看到在 dispatch_command 裡面有另外呼叫 TextCommandTranslator 這個物件,將指令翻譯成對應函數回傳到 controller 裡面,並且直接執行。

from linebot import WebhookParser
from linebot.exceptions import InvalidSignatureError
from linebot.models import Event

class LineCallbackController:

    def __init__(self, request_body: dict, signature: str):
        self._request_body = request_body
        self._signature = signature
        self._translator = TextCommandTranslator()

    def handle(self) -> bool:
        events = self._extract_events(self._request_body, self._signature)

        for event in events:
            if isinstance(event.message, TextMessage):
                self._dispatch_command(event)
            
            else:
                async_send_text_message(event.source.user_id, '這個我還看不懂所以略過哦')

        return True

    def _extract_events(self, request_body: dict, signature: str) -> List[Event]:
        parser = WebhookParser(settings.LINE_CHANNEL_SECRET)
        
        try:
            return parser.parse(request_body, signature)

        except InvalidSignatureError:
            raise

    def _dispatch_command(self, event: Event):
        command_func = self._translator.decode_command(event.message.text)
        if command_func:
            command_func(event)

        else:
            hint_msg = '不是指令清單裡,所以略過~\n\n可用指令如下:\n{}'.format(
                '\n'.join(TextCommandTranslator.command_map.keys())
            )
            async_send_text_message(event.source.user_id, hint_msg)

TextCommandTranslator

接著看 TextCommandTranslator 的部分,透過 validators 這個 python lib 來確認收到的文字訊息是否為連結,如果是就回傳啟動下載的函數。未來如果要增加其他的文字指令,只需要在 command_map 裡面新增指令字串和對應的函數即可。

import validators
from application.command.chat_bot_commands import (
    run_download_and_upload_task_command,
    run_get_user_id_command,
)

class TextCommandTranslator:

    # <command_key>: <command_func>
    command_map = {
        '我是誰': run_get_user_id_command,
    }

    def decode_command(self, command: str) -> Optional[Type]:
        # 先確認是否為網址,如果是網址,就啟動下載任務
        if validators.url(command):
            return run_download_and_upload_task_command
        else:
            commands = command.split(' ')
            return self.command_map.get(commands[0])

DownloadAndUploadTask

run_download_and_upload_task_command 這個函數其實很簡單,單純只是呼叫下面的 celery task 函數 do_download_and_upload_task 而已,所以這邊就不特別列出。

我將整個下載與上傳的流程包在 DownloadAndUploadTask 裡面,他會另外再去呼叫 YoutubedlDownloader 和 RcloneUploader 來處理上下載。

from celery_app import app

@app.task
def do_download_and_upload_task(user_id: str, url: str):
    DownloadAndUploadTask(user_id, url).execute()

class DownloadAndUploadTask:

    def __init__(self, user_id: str, url: str):
        self._user_id = user_id
        self._url = url

    def execute(self):
        send_text_message(self._user_id, '收到網址,啟動 youtube-dl,開始下載!')
        download_result, filename, filesize = self._start_download()

        if download_result:
            send_text_message(self._user_id, '下載完成!共 {} MB\n\n檔案名稱為:{}\n\n開始上傳 Google Drive'.format(filesize, filename))
            upload_result = self._start_upload(filename)
            send_text_message(self._user_id, '上傳成功!\n\n{}'.format(filename) if upload_result else '上傳失敗...\n\n{}'.format(filename))

        else:
            send_text_message(self._user_id, '下載失敗哭哭')

    def _start_download(self) -> Tuple[bool, str]:
        downloader = YoutubedlDownloader ()
        return downloader.download(self._url)

    def _start_upload(self, filename: str) -> bool:
        uploader = RcloneUploader()
        return uploader.upload(filename)

YoutubedlDownloader

終於來到我們這次的主要核心之一,YoutubedlDownloader。他會透過 python 的 subprocess 來執行 shell 指令,如此就可以達到透過程式碼來呼叫 youtube-dl。

流程是,先透過 youtube-dl 取得影片檔名,然後再下載的時候給定檔名,並且給定下載最高畫質和音質。

這邊要注意的是,通常最高畫質音質下載的時候會是分開成純影片檔和純音軌檔下載,所以下載後需要透過 ffmpeg 去合併。

我發現 output 非 mkv format 在 4.3.x ffmpeg 可以正常運作,但是在 4.1.2 卻無法。因為我是在 armbian 裡面跑,ffmpeg 目前最高穩定版只有 4.1.2,所以只好在指令中特別指定 output 為 mkv。

from typing import List, Tuple
from subprocess import check_output, CalledProcessError
import os
from utils.log import logging
import settings

class YoutubedlDownloader:

    # returns: (task result, file name, file size in MB)
    def download(self, url: str) -> Tuple[bool, str, int]:
        logging.info('Start fetch filename...')
        try:
            filename = self._get_filename(url)

        except CalledProcessError:
            logging.error('Fetch filename fail', exc_info=True)
            return False, None, 0

        logging.info('Find filename: {}, start to download...'.format(filename))
        try:
            shell_logs = self._run_download(filename, url)

        except CalledProcessError:
            logging.error('Download file error', exc_info=True)
            return False, None, 0

        logging.info('Download finish. Follows are log of youtube-dl')
        for shell_log in shell_logs:
            if shell_log:
                logging.info(shell_log)

        return True, filename, self._get_filesize(filename)

    def _get_filename(self, url: str) -> str:
        result = check_output(['youtube-dl', '--get-filename', url])
        # return result.decode().replace('\n', '')
        # 因為 ffmpeg 4.1.2 merge 的時候只能下 mkv,所以我們將副檔名改為 mkv
        # 4.3 以後就可以了
        filename = result.decode().replace('\n', '').split('.')[0]
        return filename + '.mkv'

    def _get_filesize(self, filename: str) -> int:
        return int(os.path.getsize(settings.DOWNLOAD_FOLDER_PATH.format(filename)) / 1024 / 1024)

    def _run_download(self, filename: str, url: str) -> List[str]:
        result = check_output([
            'youtube-dl',
            '-f', 'bestvideo+bestaudio',
            '--merge-output-format', 'mkv',  # 因應 ffmpeg 4.1.2 所以改成 mkv
            '-o', settings.DOWNLOAD_FOLDER_PATH.format(filename),
            url])
        return result.decode().split('\n')

RcloneUploader

核心之二就是 RcloneUploader。這邊一樣透過 python 的 subprocess 去呼叫 rclone 來將檔案上傳到 google drive 特定的資料夾下。

上傳完畢,在透過 shell command 直接將下載的影片檔案刪除,避免佔用 SBC 空間。

from subprocess import check_output, CalledProcessError
from utils.log import logging
import settings

class RcloneUploader:

    def upload(self, filename: str) -> bool:
        logging.info('Start upload file {}...'.format(filename))

        try:
            self._run_upload(filename)
            logging.info('Upload finish. Will remove file.')
            self._remove_file(filename)
            return True

        except CalledProcessError:
            logging.error('Upload file error', exc_info=True)
            return False

    def _run_upload(self, filename) -> None:
        check_output(['rclone', 'copy', settings.DOWNLOAD_FOLDER_PATH.format(filename), '{}:/'.format(settings.RCLONE_CONFIG_NAME)])

    def _remove_file(self, filename):
        check_output(['rm', settings.DOWNLOAD_FOLDER_PATH.format(filename)])

Celery

celery 設定分享如下。每個人的檔案目錄結構不同,因此 include 部分也會不同,核心概念是要把含有 celery task 的 module 路徑都登錄到此,這樣 celery 才能正確地抓到所有 task。

另外用 task route 將送訊息和下載任務分開到不同的 queue 的原因是,下載任務通常會比較久,如果下載任務把所有的 celery worker 都用滿,此時使用者在與機器人互動的時候,就沒有 worker 能夠回傳訊息了!所以我將兩者分配到不同的 queue 來避免這種情況發生。

from celery import Celery
from celery.schedules import crontab
import settings

app = Celery(
    settings.SERVICE_NAME,
    broker='redis://' + settings.REDIS_NODE,
    backend='redis://' + settings.REDIS_NODE,
    include=[
        'tasks.send_line_msg_tasks',
        'tasks.download_and_upload_task',
        'tasks.refresh_cache_for_wp_task',
    ]
)

app.conf.task_routes = {
    'tasks.send_line_msg_tasks.*': {'queue': settings.CELERY_CHATBOT_QUEUE_NAME},
    
    'tasks.refresh_cache_for_wp_task.*': {'queue': settings.CELERY_DOWNLOAD_QUEUE_NAME},
}

app.conf.update(
    result_expires=600,
)

if __name__ == '__main__':
    app.start()

entrypoint.sh

因為我會用同一個 image 起 3 個 container,分別是 http adapter (flask) 和兩個 celery worker,所以要在 entrypoint 裡面依照我給定的環境變數來切換這個 container 的主要工作

#!/bin/sh

# http_adapter mode
if [ "$SERVE_MODE" = "http_adapter" ]
then
	echo "SERVE MODE: http_adapter"
	uwsgi --ini uwsgi.ini

# celery worker mode
elif [ "$SERVE_MODE" = "celery_worker" ]
then
	echo "SERVE MODE: celery_worker"
	celery --app celery_app worker --concurrency $CELERY_WORKER_NUM --queues $CELERY_LISTEN_QUEUE_NAME --loglevel INFO

Docker-compose

我使用 docker-compose 來啟動所有的 container,相關設定分享如下。

其中需特別注意的是 rclone 的 configure 檔案,我透過掛入 host 的 config 設定檔讓 rclone 能夠上傳到 google drive。但我發現 rclone 在上傳的時候會在 config 存放路徑中複製一分暫時使用的設定檔,因此需要將 host rclone 存放 config 的 folder 權限設定為 777。因為是自己使用所以還好,但如果是多人共用的話就要用別的方法了。

version: '2.3'
services:
  redis:
    image: redis:6.0.9-alpine
    container_name: chatbot-service-redis
    restart: always
    networks:
      - chatbot-service
    mem_limit: 150M

  chatbot-service--http_adapter:
    build:
      context: .
      dockerfile: Dockerfile
    image: chatbot-service:latest
    container_name: chatbot-service--http_adapter
    restart: always
    ports:
      - "8001:8080"
    networks:
      - chatbot-service
    mem_limit: 400M
    logging:
      driver: "json-file"
      options:
        max-size: "1k"
        max-file: "3"
    environment:
      ENV: "prod"
      SERVE_MODE: "http_adapter"
      HTTP_PROC_NUM: 2
      REDIS_NODE: "redis:6379"
      
      CELERY_CHATBOT_QUEUE_NAME: "chatbot-queue"
      CELERY_DOWNLOAD_QUEUE_NAME: "downloader-queue"

      LINE_CHANNEL_ACCESS_TOKEN: "xxxx"
      LINE_CHANNEL_SECRET: "xxxx"

  chatbot-service--celery_worker--downloader:
    build:
      context: .
      dockerfile: Dockerfile
    image: chatbot-service:latest
    container_name: chatbot-service--celery_worker--downloader
    restart: always
    volumes:
      - $HOME/.config/rclone:/home/chatbot/.config/rclone  # linux 的 rclone 需改成 777
    depends_on:
      - redis
    networks:
      - chatbot-service
    mem_limit: 1G
    logging:
      driver: "json-file"
      options:
        max-size: "1k"
        max-file: "3"
    environment:
      ENV: "prod"
      SERVE_MODE: "celery_worker"
      REDIS_NODE: "redis:6379"
      CELERY_WORKER_NUM: 4
      CELERY_LISTEN_QUEUE_NAME: "downloader-queue"

      LINE_CHANNEL_ACCESS_TOKEN: "xxxxxx"
      LINE_CHANNEL_SECRET: "xxxxxx"

  chatbot-service--celery_worker--chatbot:
    build:
      context: .
      dockerfile: Dockerfile
    image: chatbot-service:latest
    container_name: chatbot-service--celery_worker--chatbot
    restart: always
    volumes:
      - $HOME/.config/rclone:/home/chatbot/.config/rclone  # linux 的 rclone 需改成 777
    depends_on:
      - redis
    networks:
      - chatbot-service
    mem_limit: 600M
    logging:
      driver: "json-file"
      options:
        max-size: "1k"
        max-file: "3"
    environment:
      ENV: "prod"
      SERVE_MODE: "celery_worker"
      REDIS_NODE: "redis:6379"
      CELERY_WORKER_NUM: 2
      CELERY_LISTEN_QUEUE_NAME: "chatbot-queue"

      LINE_CHANNEL_ACCESS_TOKEN: "xxxxx"
      LINE_CHANNEL_SECRET: "xxxxx"

networks:
  chatbot-service:
    name: chatbot-service-network
    driver: bridge

完成!

整個開發就到這裡完成!同時也做了一個基本的 chatbot 底,之後如果想要增加其他指令也會非常容易!整個處理過程也使用非同步的方式來處理,就不會堵住接收 line callback 的入口了!

youtube-dl 很紅但下指令好麻煩,做個下載機器人吧

相關文章

Python 好慢!datetime 處理優化

J
Written by J
雖然大學唸的是生物,但持著興趣與熱情自學,畢業後轉戰硬體工程師,與宅宅工程師們一起過著沒日沒夜的生活,做著台灣最薄的 intel 筆電,要與 macbook air 比拼。離開後,憑著一股傻勁與朋友創業,再度轉戰軟體工程師,一手扛起前後端、雙平台 app 開發,過程中雖跌跌撞撞,卻也累計不少經驗。可惜不是那 1% 的成功人士,於是加入其他成功人士的新創公司,專職開發後端。沒想到卻在採前人坑的過程中,拓寬了眼界,得到了深層的領悟。