[Python] 纯文本查看 复制代码
# 抖音直播录屏2.0
import subprocess
from pathlib import Path
import configparser
from threading import Timer,Thread
from loguru import logger
import requests
import re,time
from datetime import datetime
dir_filename = lambda filename:Path(__file__).parent.joinpath(filename)
_ffmepg = dir_filename('ffmpeg.exe')
logger.remove(handler_id=None) # 移除控制台输出
logger.add(dir_filename('douyin_log.log'))
headers = {
'User-Agent':'Mozilla/5.0 (Linux; U; Android 8.1.0; en-US; Nexus 6P Build/OPM7.181205.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/57.0.2987.108 UCBrowser/12.11.1.1197 Mobile Safari/537.36'
}
def reading_a_configuration_file():
"""读取配置文件"""
config = configparser.ConfigParser()
config.read(dir_filename('douyin_config.ini'),encoding='utf8')
save_dir = config.get('DouYin','dirpath')
logger.debug(f"配置文件路径为:{save_dir}")
return Path(save_dir)
def __ffmepg_command(urls,file_path_name):
"""录制命令"""
cmd = [str(_ffmepg), "-y",
"-v","verbose",
"-timeout","2000000",
"-loglevel","error",
"-hide_banner",
"-user_agent",headers["User-Agent"],
"-analyzeduration","2147483647",
"-probesize","2147483647",
"-i",urls,
'-bufsize','5000k',
"-map","0",
"-sn","-dn",
'-max_muxing_queue_size','64',
str(file_path_name)]
return cmd
@logger.catch
def requests_func(url,urlparams=False,json_params=False):
"""请求函数"""
res = requests.get(url,headers=headers,timeout=10)
res.encoding = 'utf-8-sig'
if urlparams == True:
return res.url
else:
if json_params == True:
return res.json()
else:
return res.text
@logger.catch
def get_roomid(shorturl):
"""获取roomid号"""
long_url = requests_func(shorturl[0],urlparams=True)
return re.findall(r'reflow\/(\d+)\?',long_url)[0]
def title_filter(title: str):
"""
转为Windows合法文件名
"""
# 非法字符
lst = ['\r', '\n', '\\', '/', ':', '*', '?', '"', '<', '>', '|']
# 非法字符处理方式1
for key in lst:
title = title.replace(key, '-')
# 非法字符处理方式2
# title = title.translate(None, ''.join(lst))
# 文件名+路径长度最大255,汉字*2,取60
if len(title) > 60:
title = title[:60]
return title.strip()
def heartbeat_detection_content():
"""心跳检测内容"""
global recording
live_info_url = lambda roomid:f"https://webcast.amemv.com/webcast/room/reflow/info/?type_id=0&live_id=1&room_id={roomid}&app_id=1128"
res_json = requests_func(live_info_url(get_roomid(shorturl)),json_params=True)
try:
room_ids_str = res_json["data"]["room"]["owner"]["own_room"]["room_ids_str"][0]
res_json = requests_func(live_info_url(room_ids_str),json_params=True)
flv_rtmp = res_json['data']['room']['stream_url']['flv_pull_url'].get('FULL_HD1') or \
res_json['data']['room']['stream_url']['flv_pull_url'].get('HD1') or \
res_json['data']['room']['stream_url'].get('rtmp_pull_url')
nickname = res_json["data"]["room"]["owner"]['nickname']
nickname = title_filter(nickname)
recording = True
logger.info(f'开始录制主播{nickname}')
return flv_rtmp,nickname
except:
recording = False
def the_recording_function(datas):
"""录制函数"""
if datas:
live_fielename = f'{datas[1]}-{datetime.now().strftime("%Y-%m-%d_%H_%M_%S")}.mp4'
makedir = reading_a_configuration_file()
makedir = Path(makedir)
makedir.mkdir(parents=True,exist_ok=True) # 创建文件夹
_path = makedir.joinpath(live_fielename) # 文件保存路径
try:
subprocess.Popen(__ffmepg_command(datas[0],_path)).wait()# 录制
except Exception as e:
logger.info('开播准备中')
def the_heartbeat_detection(times):
"""心跳检测执行函数"""
datas = heartbeat_detection_content()
Timer(times,the_heartbeat_detection,args=(times,))
if recording == True:
the_recording_function(datas)
else:
logger.info('还未开播')
def time_monitoring_function():
"""录制状态显示"""
start_time = datetime.now()
while True:
time.sleep(5)
if recording == False:
start_time = datetime.now()
print('\r等待录播中ing')
else:
print(f'\r正在录制:{datetime.now()-start_time}')
if __name__ == "__main__":
with open(dir_filename('douyin_address.txt'),'r',encoding='utf8') as f:
shorturl = f.readlines()
print(f'共享链接地址:{shorturl[0]}')
input('回车开始录制')
t = Thread(target = time_monitoring_function,args=()) # 监控直播已经多少秒
t.start()
the_heartbeat_detection(10) # 每10秒检测一次