asy123456 发表于 2022-8-5 18:04

抖音直播录像程序 v2.0

【下载地址】链接: https://pan.baidu.com/s/1hE3uoGWoGFlafo4EJZlKjg?pwd=gx2q 提取码: gx2q(17.5MB)




static/image/hrline/4.gif


主要是代码方面的重构,因为是学习python有一段时间,回头看了眼自己写的代码,真的好垃圾,所以重构一下以前代码,想学习的朋友,也可以看看,重构后的代码更便于维护与理解。

说明一下各个文件作用:
# 抖音直播录屏2.0.py 执行源码
douyin_address.txt直播间分享地址复制到此处
douyin_config.ini 配置文件(想要保存的目录复制到此处就可以,程序会自动创建)
run.bat windows 批处理文件,使用windows 双击运行此文件运行程序
readme.txt 说明文件

static/image/hrline/5.gif


源码:
# 抖音直播录屏2.0

import subprocess
from pathlib import Path
import configparser
from threading import Timer,Thread
from loguru import logger
import requests
import re,time
from datetime import datetime

dir_filename = lambda filename:Path(__file__).parent.joinpath(filename)
_ffmepg = dir_filename('ffmpeg.exe')
logger.remove(handler_id=None) # 移除控制台输出
logger.add(dir_filename('douyin_log.log'))
headers = {
    'User-Agent':'Mozilla/5.0 (Linux; U; Android 8.1.0; en-US; Nexus 6P Build/OPM7.181205.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/57.0.2987.108 UCBrowser/12.11.1.1197 Mobile Safari/537.36'
}


def reading_a_configuration_file():
    """读取配置文件"""
    config = configparser.ConfigParser()
    config.read(dir_filename('douyin_config.ini'),encoding='utf8')
    save_dir = config.get('DouYin','dirpath')
    logger.debug(f"配置文件路径为:{save_dir}")
    return Path(save_dir)

def __ffmepg_command(urls,file_path_name):
    """录制命令"""
    cmd = [str(_ffmepg), "-y",
            "-v","verbose",
            "-timeout","2000000",
            "-loglevel","error",
            "-hide_banner",
            "-user_agent",headers["User-Agent"],
            "-analyzeduration","2147483647",
            "-probesize","2147483647",
            "-i",urls,
            '-bufsize','5000k',
            "-map","0",
            "-sn","-dn",
            '-max_muxing_queue_size','64',
            str(file_path_name)]
    return cmd

@logger.catch
def requests_func(url,urlparams=False,json_params=False):
    """请求函数"""
    res = requests.get(url,headers=headers,timeout=10)
    res.encoding = 'utf-8-sig'
    if urlparams == True:
      return res.url
    else:
      if json_params == True:
            return res.json()
      else:
            return res.text
   
@logger.catch
def get_roomid(shorturl):
    """获取roomid号"""
    long_url = requests_func(shorturl,urlparams=True)
    return re.findall(r'reflow\/(\d+)\?',long_url)

def title_filter(title: str):
    """
    转为Windows合法文件名
    """
    # 非法字符
    lst = ['\r', '\n', '\\', '/', ':', '*', '?', '"', '<', '>', '|']
    # 非法字符处理方式1
    for key in lst:
      title = title.replace(key, '-')
    # 非法字符处理方式2
    # title = title.translate(None, ''.join(lst))
    # 文件名+路径长度最大255,汉字*2,取60
    if len(title) > 60:
      title = title[:60]
    return title.strip()
   

def heartbeat_detection_content():
    """心跳检测内容"""
    global recording
    live_info_url = lambda roomid:f"https://webcast.amemv.com/webcast/room/reflow/info/?type_id=0&live_id=1&room_id={roomid}&app_id=1128"

    res_json = requests_func(live_info_url(get_roomid(shorturl)),json_params=True)
    try:
      room_ids_str = res_json["data"]["room"]["owner"]["own_room"]["room_ids_str"]
      res_json = requests_func(live_info_url(room_ids_str),json_params=True)
      flv_rtmp = res_json['data']['room']['stream_url']['flv_pull_url'].get('FULL_HD1') or \
            res_json['data']['room']['stream_url']['flv_pull_url'].get('HD1') or \
                res_json['data']['room']['stream_url'].get('rtmp_pull_url')
      nickname = res_json["data"]["room"]["owner"]['nickname']
      nickname = title_filter(nickname)
      recording = True
      logger.info(f'开始录制主播{nickname}')
      return flv_rtmp,nickname
    except:
      recording = False

def the_recording_function(datas):
    """录制函数"""
    if datas:
      live_fielename = f'{datas}-{datetime.now().strftime("%Y-%m-%d_%H_%M_%S")}.mp4'
      makedir = reading_a_configuration_file()
      makedir = Path(makedir)
      makedir.mkdir(parents=True,exist_ok=True) # 创建文件夹
      _path = makedir.joinpath(live_fielename) # 文件保存路径
      try:
            subprocess.Popen(__ffmepg_command(datas,_path)).wait()# 录制
      except Exception as e:
            logger.info('开播准备中')
            
def the_heartbeat_detection(times):
    """心跳检测执行函数"""
    datas = heartbeat_detection_content()
    Timer(times,the_heartbeat_detection,args=(times,))
    if recording == True:
      the_recording_function(datas)
    else:
      logger.info('还未开播')

def time_monitoring_function():
    """录制状态显示"""
    start_time = datetime.now()
    while True:
      time.sleep(5)
      if recording == False:
            start_time = datetime.now()
            print('\r等待录播中ing')
      else:
            print(f'\r正在录制:{datetime.now()-start_time}')

if __name__ == "__main__":   
    with open(dir_filename('douyin_address.txt'),'r',encoding='utf8') as f:
      shorturl = f.readlines()
    print(f'共享链接地址:{shorturl}')
    input('回车开始录制')
    t = Thread(target = time_monitoring_function,args=()) # 监控直播已经多少秒
    t.start()
    the_heartbeat_detection(10) # 每10秒检测一次

18634567866 发表于 2022-8-5 18:50

谢谢分享

letgzz 发表于 2022-8-5 18:51

谢谢分享!

876hhh 发表于 2022-8-5 19:10

谢谢大佬分享

43521786zxcv 发表于 2022-8-5 19:24


感谢楼主的分享

930ckj 发表于 2022-8-5 20:23


支持楼主,谢谢分享。

1997cbw 发表于 2022-8-5 21:05

感谢楼主分享

edision 发表于 2022-8-5 22:56

谢谢大牛

ws1443 发表于 2022-8-6 02:33

正需要,支持楼主,在大牛我只看好你!

kawaii2003 发表于 2022-8-6 05:31

谢谢大佬
页: [1] 2
查看完整版本: 抖音直播录像程序 v2.0