import { PassThrough } from 'stream' import got from 'got' import Request from 'got/dist/source/core'; import { StreamType } from '../stream'; export interface FormatInterface{ url : string; targetDurationSec : number; maxDvrDurationSec : number } export class LiveStreaming{ type : StreamType stream : PassThrough private base_url : string private url : string private interval : number private packet_count : number private timer : NodeJS.Timer | null private segments_urls : string[] constructor(dash_url : string, target_interval : number){ this.type = StreamType.Arbitrary this.url = dash_url this.base_url = '' this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.segments_urls = [] this.packet_count = 0 this.timer = null this.interval = target_interval * 1000 || 0 this.stream.on('close', () => { this.cleanup() }); this.start() } private async dash_getter(){ let response = await got(this.url) let audioFormat = response.body.split('')[0].split('') if(audioFormat[audioFormat.length - 1] === '') audioFormat.pop() this.base_url = audioFormat[audioFormat.length - 1].split('')[1].split('')[0] let list = audioFormat[audioFormat.length - 1].split('')[1].split('')[0] this.segments_urls = list.replace(new RegExp('') if(this.segments_urls[this.segments_urls.length - 1] === '') this.segments_urls.pop() } private cleanup(){ clearTimeout(this.timer as NodeJS.Timer) this.timer = null this.url = '' this.base_url = '' this.segments_urls = [] this.packet_count = 0 this.interval = 0 } private async start(){ if(this.stream.destroyed){ this.cleanup() return } await this.dash_getter() if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('sq/')[1].split('/')[0]) for await (let segment of this.segments_urls){ if(Number(segment.split('sq/')[1].split('/')[0]) !== this.packet_count){ continue } await (async () => { return new Promise(async (resolve, reject) => { let stream = got.stream(this.base_url + segment) stream.on('data', (chunk: any) => this.stream.write(chunk)) stream.on('end', () => { this.packet_count++ resolve('') }) }) })() } this.timer = setTimeout(() => { this.start() }, this.interval) } } export class LiveEnded{ type : StreamType stream : PassThrough private url : string; private base_url : string; private packet_count : number private segments_urls : string[] constructor(dash_url : string){ this.type = StreamType.Arbitrary this.url = dash_url this.base_url = '' this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.segments_urls = [] this.packet_count = 0 this.stream.on('close', () => { this.cleanup() }) this.start() } private async dash_getter(){ let response = await got(this.url) let audioFormat = response.body.split('')[0].split('') if(audioFormat[audioFormat.length - 1] === '') audioFormat.pop() this.base_url = audioFormat[audioFormat.length - 1].split('')[1].split('')[0] let list = audioFormat[audioFormat.length - 1].split('')[1].split('')[0] this.segments_urls = list.replace(new RegExp('') if(this.segments_urls[this.segments_urls.length - 1] === '') this.segments_urls.pop() } private cleanup(){ this.url = '' this.base_url = '' this.segments_urls = [] this.packet_count = 0 } private async start(){ if(this.stream.destroyed){ this.cleanup() return } await this.dash_getter() if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('sq/')[1].split('/')[0]) for await (let segment of this.segments_urls){ if(Number(segment.split('sq/')[1].split('/')[0]) !== this.packet_count){ continue } await (async () => { return new Promise(async (resolve, reject) => { let stream = got.stream(this.base_url + segment) stream.on('data', (chunk: any) => this.stream.write(chunk)) stream.on('end', () => { this.packet_count++ resolve('') }) }) })() } } } export class Stream { type : StreamType stream : PassThrough private url : string private bytes_count : number; private per_sec_bytes : number; private duration : number; private timer : NodeJS.Timer | null constructor(url : string, type : StreamType, duration : number){ this.url = url this.type = type this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.bytes_count = 0 this.per_sec_bytes = 0 this.timer = null this.duration = duration; (duration > 300) ? this.loop_start() : this.normal_start() } private cleanup(){ clearTimeout(this.timer as NodeJS.Timer) this.timer = null this.url = '' this.bytes_count = 0 this.per_sec_bytes = 0 } private normal_start(){ if(this.stream.destroyed){ this.cleanup() return } let stream = got.stream(this.url) stream.pipe(this.stream) } private loop_start(){ if(this.stream.destroyed){ this.cleanup() return } let stream = got.stream(this.url) stream.once('data', () => { this.per_sec_bytes = Math.ceil((stream.downloadProgress.total as number)/this.duration) }) stream.on('data', (chunk: any) => { this.bytes_count += chunk.length this.stream.write(chunk) }) stream.on('data', () => { if(this.bytes_count > (this.per_sec_bytes * 300)){ stream.destroy() } }) this.timer = setTimeout(() => { this.loop() }, 290 * 1000) } private loop(){ if(this.stream.destroyed){ this.cleanup() return } let absolute_bytes : number = 0 let stream = got.stream(this.url, { headers : { "range" : `bytes=${this.bytes_count}-` } }) stream.on('data', (chunk: any) => { absolute_bytes += chunk.length this.bytes_count += chunk.length this.stream.write(chunk) }) stream.on('data', () => { if(absolute_bytes > (this.per_sec_bytes * 300)){ stream.destroy() } }) stream.on('end', () => { this.cleanup() }) this.timer = setTimeout(() => { this.loop() }, 300 * 1000) } }