From f8d72f85d37a6c947942c15b52006d36c8c8834f Mon Sep 17 00:00:00 2001 From: killer069 <65385476+killer069@users.noreply.github.com> Date: Fri, 20 Aug 2021 23:37:30 +0530 Subject: [PATCH] LiveStream update, added type making it easy for playback --- play-dl/YouTube/classes/LiveStream.ts | 124 +++++++++++++++++++------- play-dl/YouTube/index.ts | 2 +- play-dl/YouTube/stream.ts | 86 ++++++------------ play-dl/index.ts | 5 +- 4 files changed, 123 insertions(+), 94 deletions(-) diff --git a/play-dl/YouTube/classes/LiveStream.ts b/play-dl/YouTube/classes/LiveStream.ts index 706641d..fb4d59c 100644 --- a/play-dl/YouTube/classes/LiveStream.ts +++ b/play-dl/YouTube/classes/LiveStream.ts @@ -1,5 +1,7 @@ import { PassThrough } from 'stream' import got from 'got' +import Request from 'got/dist/source/core'; +import { StreamType } from '../stream'; export interface FormatInterface{ url : string; @@ -8,35 +10,84 @@ export interface FormatInterface{ } export class LiveStreaming{ - smooth : boolean; - private __stream : PassThrough + type : StreamType + actual_live : boolean; + stream : PassThrough private format : FormatInterface private interval : number private packet_count : number private timer : NodeJS.Timer | null private segments_urls : string[] - constructor(format : FormatInterface, smooth : boolean){ - this.smooth = smooth || false + constructor(format : FormatInterface, actual_live : boolean){ + this.type = StreamType.Arbitrary + this.actual_live = actual_live || false this.format = format - this.__stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) + this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.segments_urls = [] this.packet_count = 0 this.interval = 0 this.timer = null - this.__stream.on('close', () => { + this.stream.on('close', () => { this.cleanup() - }) - if(this.smooth === true) this.__stream.pause() - this.start() + }); + (this.actual_live) ? this.live_loop() :this.start() } - async manifest_getter(){ - let response = await got(this.format.url) - this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https')) + private async live_loop(){ + if(this.stream.destroyed) this.cleanup() + await this.manifest_getter() + this.segments_urls.splice(0, (this.segments_urls.length / 2)) + if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0]) + for await (let url of this.segments_urls){ + await (async () => { + return new Promise(async (resolve, reject) => { + if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){ + resolve('') + return + } + let stream = this.got_stream(url) + stream.on('data', (chunk) => this.stream.write(chunk)) + stream.on('end', () => { + this.packet_count++ + resolve('') + }) + }) + })() + } + this.interval = 1 + this.timer = setTimeout(async () => { + await this.looping() + }, this.interval) } - get stream(){ - return this.__stream + private async looping(){ + if(this.stream.destroyed) this.cleanup() + await this.manifest_getter() + for await (let url of this.segments_urls){ + await (async () => { + return new Promise(async (resolve, reject) => { + if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){ + resolve('') + return + } + let stream = this.got_stream(url) + stream.on('data', (chunk) => this.stream.write(chunk)) + stream.on('end', () => { + this.packet_count++ + resolve('') + }) + }) + })() + } + this.interval = 1 + this.timer = setTimeout(async () => { + await this.looping() + }, this.interval) + } + + private async manifest_getter(){ + let response = await got(this.format.url) + this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https')) } private cleanup(){ @@ -45,19 +96,19 @@ export class LiveStreaming{ this.packet_count = 0 } - async start(){ - if(this.__stream.destroyed) this.cleanup() + private async start(){ + if(this.stream.destroyed) this.cleanup() await this.manifest_getter() if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0]) for await (let url of this.segments_urls){ await (async () => { return new Promise(async (resolve, reject) => { if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){ - resolve('') - return + resolve('') + return } let stream = this.got_stream(url) - stream.on('data', (chunk) => this.__stream.write(chunk)) + stream.on('data', (chunk) => this.stream.write(chunk)) stream.on('end', () => { this.packet_count++ resolve('') @@ -67,10 +118,6 @@ export class LiveStreaming{ } this.interval = (this.segments_urls.length / 2) * 1000 this.timer = setTimeout(async () => { - if(this.smooth === true){ - this.__stream.resume() - this.smooth = false - } await this.start() }, this.interval) } @@ -81,16 +128,18 @@ export class LiveStreaming{ } export class LiveEnded{ - private __stream : PassThrough + type : StreamType + stream : PassThrough private format : FormatInterface private packet_count : number private segments_urls : string[] constructor(format : FormatInterface){ + this.type = StreamType.Arbitrary this.format = format - this.__stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) + this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.segments_urls = [] this.packet_count = 0 - this.__stream.on('close', () => { + this.stream.on('close', () => { this.cleanup() }) this.start() @@ -101,17 +150,13 @@ export class LiveEnded{ this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https')) } - get stream(){ - return this.__stream - } - private cleanup(){ this.segments_urls = [] this.packet_count = 0 } async start(){ - if(this.__stream.destroyed) this.cleanup() + if(this.stream.destroyed) this.cleanup() await this.manifest_getter() if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0]) for await (let url of this.segments_urls){ @@ -122,7 +167,7 @@ export class LiveEnded{ return } let stream = this.got_stream(url) - stream.on('data', (chunk) => this.__stream.write(chunk)) + stream.on('data', (chunk) => this.stream.write(chunk)) stream.on('end', () => { this.packet_count++ resolve('') @@ -137,3 +182,18 @@ export class LiveEnded{ } } +export class Stream { + type : StreamType + private piping_stream : Request + private playing_stream : PassThrough + constructor(url : string, type : StreamType){ + this.type = type + this.piping_stream = got.stream(url) + this.playing_stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) + this.piping_stream.pipe(this.playing_stream) + } + + get stream(){ + return this.playing_stream + } +} \ No newline at end of file diff --git a/play-dl/YouTube/index.ts b/play-dl/YouTube/index.ts index 8cfd2ce..f758a1b 100644 --- a/play-dl/YouTube/index.ts +++ b/play-dl/YouTube/index.ts @@ -1,3 +1,3 @@ export { search } from './search' -export { stream, stream_from_info, stream_type } from './stream' +export { stream, stream_from_info } from './stream' export * from './utils' \ No newline at end of file diff --git a/play-dl/YouTube/stream.ts b/play-dl/YouTube/stream.ts index 4964d8b..b0693f6 100644 --- a/play-dl/YouTube/stream.ts +++ b/play-dl/YouTube/stream.ts @@ -1,10 +1,7 @@ -import got from "got" import { video_info } from "." -import { PassThrough } from 'stream' -import https from 'https' -import { FormatInterface, LiveEnded, LiveStreaming } from "./classes/LiveStream" +import { FormatInterface, LiveEnded, LiveStreaming, Stream } from "./classes/LiveStream" -enum StreamType{ +export enum StreamType{ Arbitrary = 'arbitrary', Raw = 'raw', OggOpus = 'ogg/opus', @@ -13,7 +10,7 @@ enum StreamType{ } interface StreamOptions { - smooth : boolean + actual_live : boolean } interface InfoData{ @@ -40,79 +37,59 @@ function parseAudioFormats(formats : any[]){ return result } -export async function stream(url : string, options : StreamOptions = { smooth : false }): Promise{ +export async function stream(url : string, options : StreamOptions = { actual_live : false }): Promise{ let info = await video_info(url) let final: any[] = []; - + let type : StreamType; if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) { - return await live_stream(info as InfoData, options.smooth) + return await live_stream(info as InfoData, options.actual_live) } let audioFormat = parseAudioFormats(info.format) let opusFormats = filterFormat(audioFormat, "opus") if(opusFormats.length === 0){ + type = StreamType.Arbitrary final.push(audioFormat[audioFormat.length - 1]) } else{ + type = StreamType.WebmOpus final.push(opusFormats[opusFormats.length - 1]) } - if(final.length === 0) final.push(info.format[info.format.length - 1]) - let piping_stream = got.stream(final[0].url, { - retry : 5, - headers: { - 'Connection': 'keep-alive', - 'Accept-Encoding': '', - 'Accept-Language': 'en-US,en;q=0.8', - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36' - }, - agent : { - https : new https.Agent({ keepAlive : true }) - }, - http2 : true - }) - let playing_stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 }) - - piping_stream.pipe(playing_stream) - return playing_stream + if(final.length === 0) { + type = StreamType.Arbitrary + final.push(info.format[info.format.length - 1]) + } + + return new Stream(final[0].url, type) } -export async function stream_from_info(info : InfoData, options : StreamOptions){ +export async function stream_from_info(info : InfoData, options : StreamOptions = { actual_live : false }): Promise{ let final: any[] = []; - - if(info.LiveStreamData.isLive === true) { - return await live_stream(info as InfoData, options.smooth) + let type : StreamType; + if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) { + return await live_stream(info as InfoData, options.actual_live) } let audioFormat = parseAudioFormats(info.format) let opusFormats = filterFormat(audioFormat, "opus") if(opusFormats.length === 0){ + type = StreamType.Arbitrary final.push(audioFormat[audioFormat.length - 1]) } else{ + type = StreamType.WebmOpus final.push(opusFormats[opusFormats.length - 1]) } - if(final.length === 0) final.push(info.format[info.format.length - 1]) - let piping_stream = got.stream(final[0].url, { - retry : 5, - headers: { - 'Connection': 'keep-alive', - 'Accept-Encoding': '', - 'Accept-Language': 'en-US,en;q=0.8', - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36' - }, - agent : { - https : new https.Agent({ keepAlive : true }) - }, - http2 : true - }) - let playing_stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 }) - - piping_stream.pipe(playing_stream) - return playing_stream + if(final.length === 0) { + type = StreamType.Arbitrary + final.push(info.format[info.format.length - 1]) + } + + return new Stream(final[0].url, type) } function filterFormat(formats : any[], codec : string){ @@ -123,12 +100,7 @@ function filterFormat(formats : any[], codec : string){ return result } -export function stream_type(info:InfoData): StreamType{ - if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) return StreamType.Arbitrary - else return StreamType.WebmOpus -} - -async function live_stream(info : InfoData, smooth : boolean): Promise{ +async function live_stream(info : InfoData, actual_live : boolean): Promise{ let res_144 : FormatInterface = { url : '', targetDurationSec : 0, @@ -140,10 +112,10 @@ async function live_stream(info : InfoData, smooth : boolean): Promise