前一陣子因為下架 youtube-dl 的事件鬧得很大,反而更多人知道這個神器(我也是其中一人)。但他是用 python 寫的 command line 程式,如果我在手機看到一段影片想要下載,還得找台電腦下指令下載,實在太麻煩了!於是,我想到用 line message api 做個簡單的下載 chatbot 來玩玩!這邊就分享我的開發過程。
目錄
架構概觀
需要使用以下程式或套件
- youtube-dl
- rclone
- flask
- celery
- redis
- line-chatbot-sdk
- docker
主要分享如何使用上述元件做出我的 chatbot 下載器,個別的介紹或教學就不在此篇文章討論。
先簡單描述一下整體流程架構:
- 想要透過 chatbot 接收我從 line 丟過去的影片連結,然後自動啟動下載
- 下載完成後會回傳影片的檔案名稱和大小到 line 對話
- 自動將下載完成的影片上傳到私人的 google drive
- 從 local 刪除剛剛下載的影片
個別的元素在過程中負責的任務:
- flask 作為 http server,提供 callback 給 line 接收使用者傳來的訊息,並將對應 task 透過 celery api 發送到 redis
- celery 作為 worker,監聽 redis 的訊息,收到影片網址後,透過 python 呼叫 youtube-dl 開始下載
- 下載完成,celery 透過 line mesage api 發送訊息給使用者, 並且繼續呼叫 rclone 將影片上傳到 google drive
- 上傳完成,發送訊息通知使用者,刪除檔案,結束
串接 Line callback api
首先用 flask 寫一段 callback api 給 line 在收到使用者訊息時回扣使用。我習慣在 flask 這邊只簡單撰寫接收、檢查、回傳資料,邏輯處理的部份會另外放在 controller 裡面處理。
@line_message_webhook_api.route("/callback", methods=['POST'])
def receive_callback():
signature = request.headers['X-Line-Signature']
body = request.get_data(as_text=True)
logging.debug(body)
try:
controller = LineCallbackController(body, signature)
controller.handle()
except InvalidSignatureError:
logging.error("Invalid signature. Please check your channel access token/channel secret.")
abort(400)
return 'OK'
實現 LineCallbackController
接下來看 LineCallbackController 的部分,可以看到這邊透過 line-chatbot-sdk,將收到的 json 解成物件的方式來處理比較方便。因為我們這次只要抓文字訊息,所以非文字訊息的部分都丟棄不處理。
為了未來還可以新增更多的指令功能,所以我將文字指令分析的程式碼又另外抽出來寫,因此可以看到在 dispatch_command 裡面有另外呼叫 TextCommandTranslator 這個物件,將指令翻譯成對應函數回傳到 controller 裡面,並且直接執行。
from linebot import WebhookParser
from linebot.exceptions import InvalidSignatureError
from linebot.models import Event
class LineCallbackController:
def __init__(self, request_body: dict, signature: str):
self._request_body = request_body
self._signature = signature
self._translator = TextCommandTranslator()
def handle(self) -> bool:
events = self._extract_events(self._request_body, self._signature)
for event in events:
if isinstance(event.message, TextMessage):
self._dispatch_command(event)
else:
async_send_text_message(event.source.user_id, '這個我還看不懂所以略過哦')
return True
def _extract_events(self, request_body: dict, signature: str) -> List[Event]:
parser = WebhookParser(settings.LINE_CHANNEL_SECRET)
try:
return parser.parse(request_body, signature)
except InvalidSignatureError:
raise
def _dispatch_command(self, event: Event):
command_func = self._translator.decode_command(event.message.text)
if command_func:
command_func(event)
else:
hint_msg = '不是指令清單裡,所以略過~\n\n可用指令如下:\n{}'.format(
'\n'.join(TextCommandTranslator.command_map.keys())
)
async_send_text_message(event.source.user_id, hint_msg)
實現 TextCommandTranslator
接著看 TextCommandTranslator 的部分,透過 validators 這個 python lib 來確認收到的文字訊息是否為連結,如果是就回傳啟動下載的函數。未來如果要增加其他的文字指令,只需要在 command_map 裡面新增指令字串和對應的函數即可。
import validators
from application.command.chat_bot_commands import (
run_download_and_upload_task_command,
run_get_user_id_command,
)
class TextCommandTranslator:
# <command_key>: <command_func>
command_map = {
'我是誰': run_get_user_id_command,
}
def decode_command(self, command: str) -> Optional[Type]:
# 先確認是否為網址,如果是網址,就啟動下載任務
if validators.url(command):
return run_download_and_upload_task_command
else:
commands = command.split(' ')
return self.command_map.get(commands[0])
實現 DownloadAndUploadTask
run_download_and_upload_task_command 這個函數其實很簡單,單純只是呼叫下面的 celery task 函數 do_download_and_upload_task 而已,所以這邊就不特別列出。
我將整個下載與上傳的流程包在 DownloadAndUploadTask 裡面,他會另外再去呼叫 YoutubedlDownloader 和 RcloneUploader 來處理上下載。
from celery_app import app
@app.task
def do_download_and_upload_task(user_id: str, url: str):
DownloadAndUploadTask(user_id, url).execute()
class DownloadAndUploadTask:
def __init__(self, user_id: str, url: str):
self._user_id = user_id
self._url = url
def execute(self):
send_text_message(self._user_id, '收到網址,啟動 youtube-dl,開始下載!')
download_result, filename, filesize = self._start_download()
if download_result:
send_text_message(self._user_id, '下載完成!共 {} MB\n\n檔案名稱為:{}\n\n開始上傳 Google Drive'.format(filesize, filename))
upload_result = self._start_upload(filename)
send_text_message(self._user_id, '上傳成功!\n\n{}'.format(filename) if upload_result else '上傳失敗...\n\n{}'.format(filename))
else:
send_text_message(self._user_id, '下載失敗哭哭')
def _start_download(self) -> Tuple[bool, str]:
downloader = YoutubedlDownloader ()
return downloader.download(self._url)
def _start_upload(self, filename: str) -> bool:
uploader = RcloneUploader()
return uploader.upload(filename)
實現 YoutubedlDownloader
終於來到我們這次的主要核心之一,YoutubedlDownloader。他會透過 python 的 subprocess 來執行 shell 指令,如此就可以達到透過程式碼來呼叫 youtube-dl。
流程是,先透過 youtube-dl 取得影片檔名,然後再下載的時候給定檔名,並且給定下載最高畫質和音質。
這邊要注意的是,通常最高畫質音質下載的時候會是分開成純影片檔和純音軌檔下載,所以下載後需要透過 ffmpeg 去合併。
我發現 output 非 mkv format 在 4.3.x ffmpeg 可以正常運作,但是在 4.1.2 卻無法。因為我是在 armbian 裡面跑,ffmpeg 目前最高穩定版只有 4.1.2,所以只好在指令中特別指定 output 為 mkv。
from typing import List, Tuple
from subprocess import check_output, CalledProcessError
import os
from utils.log import logging
import settings
class YoutubedlDownloader:
# returns: (task result, file name, file size in MB)
def download(self, url: str) -> Tuple[bool, str, int]:
logging.info('Start fetch filename...')
try:
filename = self._get_filename(url)
except CalledProcessError:
logging.error('Fetch filename fail', exc_info=True)
return False, None, 0
logging.info('Find filename: {}, start to download...'.format(filename))
try:
shell_logs = self._run_download(filename, url)
except CalledProcessError:
logging.error('Download file error', exc_info=True)
return False, None, 0
logging.info('Download finish. Follows are log of youtube-dl')
for shell_log in shell_logs:
if shell_log:
logging.info(shell_log)
return True, filename, self._get_filesize(filename)
def _get_filename(self, url: str) -> str:
result = check_output(['youtube-dl', '--get-filename', url])
# return result.decode().replace('\n', '')
# 因為 ffmpeg 4.1.2 merge 的時候只能下 mkv,所以我們將副檔名改為 mkv
# 4.3 以後就可以了
filename = result.decode().replace('\n', '').split('.')[0]
return filename + '.mkv'
def _get_filesize(self, filename: str) -> int:
return int(os.path.getsize(settings.DOWNLOAD_FOLDER_PATH.format(filename)) / 1024 / 1024)
def _run_download(self, filename: str, url: str) -> List[str]:
result = check_output([
'youtube-dl',
'-f', 'bestvideo+bestaudio',
'--merge-output-format', 'mkv', # 因應 ffmpeg 4.1.2 所以改成 mkv
'-o', settings.DOWNLOAD_FOLDER_PATH.format(filename),
url])
return result.decode().split('\n')
實現 RcloneUploader
核心之二就是 RcloneUploader。這邊一樣透過 python 的 subprocess 去呼叫 rclone 來將檔案上傳到 google drive 特定的資料夾下。
上傳完畢,在透過 shell command 直接將下載的影片檔案刪除,避免佔用 SBC 空間。
from subprocess import check_output, CalledProcessError
from utils.log import logging
import settings
class RcloneUploader:
def upload(self, filename: str) -> bool:
logging.info('Start upload file {}...'.format(filename))
try:
self._run_upload(filename)
logging.info('Upload finish. Will remove file.')
self._remove_file(filename)
return True
except CalledProcessError:
logging.error('Upload file error', exc_info=True)
return False
def _run_upload(self, filename) -> None:
check_output(['rclone', 'copy', settings.DOWNLOAD_FOLDER_PATH.format(filename), '{}:/'.format(settings.RCLONE_CONFIG_NAME)])
def _remove_file(self, filename):
check_output(['rm', settings.DOWNLOAD_FOLDER_PATH.format(filename)])
設定 Celery
celery 設定分享如下。每個人的檔案目錄結構不同,因此 include 部分也會不同,核心概念是要把含有 celery task 的 module 路徑都登錄到此,這樣 celery 才能正確地抓到所有 task。
另外用 task route 將送訊息和下載任務分開到不同的 queue 的原因是,下載任務通常會比較久,如果下載任務把所有的 celery worker 都用滿,此時使用者在與機器人互動的時候,就沒有 worker 能夠回傳訊息了!所以我將兩者分配到不同的 queue 來避免這種情況發生。
from celery import Celery
from celery.schedules import crontab
import settings
app = Celery(
settings.SERVICE_NAME,
broker='redis://' + settings.REDIS_NODE,
backend='redis://' + settings.REDIS_NODE,
include=[
'tasks.send_line_msg_tasks',
'tasks.download_and_upload_task',
'tasks.refresh_cache_for_wp_task',
]
)
app.conf.task_routes = {
'tasks.send_line_msg_tasks.*': {'queue': settings.CELERY_CHATBOT_QUEUE_NAME},
'tasks.refresh_cache_for_wp_task.*': {'queue': settings.CELERY_DOWNLOAD_QUEUE_NAME},
}
app.conf.update(
result_expires=600,
)
if __name__ == '__main__':
app.start()
準備 entrypoint.sh
因為我會用同一個 image 起 3 個 container,分別是 http adapter (flask) 和兩個 celery worker,所以要在 entrypoint 裡面依照我給定的環境變數來切換這個 container 的主要工作
#!/bin/sh
# http_adapter mode
if [ "$SERVE_MODE" = "http_adapter" ]
then
echo "SERVE MODE: http_adapter"
uwsgi --ini uwsgi.ini
# celery worker mode
elif [ "$SERVE_MODE" = "celery_worker" ]
then
echo "SERVE MODE: celery_worker"
celery --app celery_app worker --concurrency $CELERY_WORKER_NUM --queues $CELERY_LISTEN_QUEUE_NAME --loglevel INFO
準備 Docker-compose
我使用 docker-compose 來啟動所有的 container,相關設定分享如下。
其中需特別注意的是 rclone 的 configure 檔案,我透過掛入 host 的 config 設定檔讓 rclone 能夠上傳到 google drive。但我發現 rclone 在上傳的時候會在 config 存放路徑中複製一分暫時使用的設定檔,因此需要將 host rclone 存放 config 的 folder 權限設定為 777。因為是自己使用所以還好,但如果是多人共用的話就要用別的方法了。
version: '2.3'
services:
redis:
image: redis:6.0.9-alpine
container_name: chatbot-service-redis
restart: always
networks:
- chatbot-service
mem_limit: 150M
chatbot-service--http_adapter:
build:
context: .
dockerfile: Dockerfile
image: chatbot-service:latest
container_name: chatbot-service--http_adapter
restart: always
ports:
- "8001:8080"
networks:
- chatbot-service
mem_limit: 400M
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"
environment:
ENV: "prod"
SERVE_MODE: "http_adapter"
HTTP_PROC_NUM: 2
REDIS_NODE: "redis:6379"
CELERY_CHATBOT_QUEUE_NAME: "chatbot-queue"
CELERY_DOWNLOAD_QUEUE_NAME: "downloader-queue"
LINE_CHANNEL_ACCESS_TOKEN: "xxxx"
LINE_CHANNEL_SECRET: "xxxx"
chatbot-service--celery_worker--downloader:
build:
context: .
dockerfile: Dockerfile
image: chatbot-service:latest
container_name: chatbot-service--celery_worker--downloader
restart: always
volumes:
- $HOME/.config/rclone:/home/chatbot/.config/rclone # linux 的 rclone 需改成 777
depends_on:
- redis
networks:
- chatbot-service
mem_limit: 1G
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"
environment:
ENV: "prod"
SERVE_MODE: "celery_worker"
REDIS_NODE: "redis:6379"
CELERY_WORKER_NUM: 4
CELERY_LISTEN_QUEUE_NAME: "downloader-queue"
LINE_CHANNEL_ACCESS_TOKEN: "xxxxxx"
LINE_CHANNEL_SECRET: "xxxxxx"
chatbot-service--celery_worker--chatbot:
build:
context: .
dockerfile: Dockerfile
image: chatbot-service:latest
container_name: chatbot-service--celery_worker--chatbot
restart: always
volumes:
- $HOME/.config/rclone:/home/chatbot/.config/rclone # linux 的 rclone 需改成 777
depends_on:
- redis
networks:
- chatbot-service
mem_limit: 600M
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"
environment:
ENV: "prod"
SERVE_MODE: "celery_worker"
REDIS_NODE: "redis:6379"
CELERY_WORKER_NUM: 2
CELERY_LISTEN_QUEUE_NAME: "chatbot-queue"
LINE_CHANNEL_ACCESS_TOKEN: "xxxxx"
LINE_CHANNEL_SECRET: "xxxxx"
networks:
chatbot-service:
name: chatbot-service-network
driver: bridge
Deploy And Test
整個開發就到這裡完成!同時也做了一個基本的 chatbot 底,之後如果想要增加其他指令也會非常容易!整個處理過程也使用非同步的方式來處理,就不會堵住接收 line callback 的入口了!