From c68a57aaf4fea07057d8fdcca352a291a970fcef Mon Sep 17 00:00:00 2001 From: killer069 <65385476+killer069@users.noreply.github.com> Date: Tue, 21 Sep 2021 10:22:43 +0530 Subject: [PATCH] Some changes to Stream creation --- play-dl/SoundCloud/classes.ts | 21 +++++++-------- play-dl/YouTube/classes/LiveStream.ts | 38 +++++++++++++-------------- 2 files changed, 28 insertions(+), 31 deletions(-) diff --git a/play-dl/SoundCloud/classes.ts b/play-dl/SoundCloud/classes.ts index 8cdeb21..edf477f 100644 --- a/play-dl/SoundCloud/classes.ts +++ b/play-dl/SoundCloud/classes.ts @@ -154,9 +154,8 @@ export class SoundCloudPlaylist { } } -export class Stream { +export class Stream extends PassThrough { type: StreamType; - stream: PassThrough; private url: string; private playing_count: number; private downloaded_time: number; @@ -166,9 +165,9 @@ export class Stream { private time: number[]; private segment_urls: string[]; constructor(url: string, type: StreamType = StreamType.Arbitrary) { + super({ highWaterMark: 10 * 1000 * 1000 }); this.type = type; this.url = url; - this.stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 }); this.playing_count = 0; this.downloaded_time = 0; this.request = null; @@ -176,14 +175,14 @@ export class Stream { this.data_ended = false; this.time = []; this.segment_urls = []; - this.stream.on('close', () => { + this.on('close', () => { this.cleanup(); }); - this.stream.on('pause', () => { + this.on('pause', () => { this.playing_count++; if (this.data_ended) { this.cleanup(); - this.stream.removeAllListeners('pause'); + this.removeAllListeners('pause'); } else if (this.playing_count === 110) { this.playing_count = 0; this.start(); @@ -209,7 +208,7 @@ export class Stream { } private async start() { - if (this.stream.destroyed) { + if (this.destroyed) { this.cleanup(); return; } @@ -222,7 +221,7 @@ export class Stream { } private async loop() { - if (this.stream.destroyed) { + if (this.destroyed) { this.cleanup(); return; } @@ -235,18 +234,18 @@ export class Stream { const stream = await request_stream(this.segment_urls.shift() as string).catch((err: Error) => err); if (stream instanceof Error) throw stream; - stream.pipe(this.stream, { end: false }); + stream.pipe(this, { end: false }); stream.on('end', () => { if (this.downloaded_time >= 300) return; else this.loop(); }); stream.once('error', (err) => { - this.stream.emit('error', err); + this.emit('error', err); }); } private cleanup() { - this.request?.unpipe(this.stream); + this.request?.unpipe(this); this.request?.destroy(); this.url = ''; this.playing_count = 0; diff --git a/play-dl/YouTube/classes/LiveStream.ts b/play-dl/YouTube/classes/LiveStream.ts index 806be47..5bc249d 100644 --- a/play-dl/YouTube/classes/LiveStream.ts +++ b/play-dl/YouTube/classes/LiveStream.ts @@ -10,9 +10,8 @@ export interface FormatInterface { maxDvrDurationSec: number; } -export class LiveStreaming { +export class LiveStreaming extends PassThrough { type: StreamType; - stream: PassThrough; private base_url: string; private url: string; private interval: number; @@ -23,10 +22,10 @@ export class LiveStreaming { private segments_urls: string[]; private request: IncomingMessage | null; constructor(dash_url: string, target_interval: number, video_url: string) { + super({ highWaterMark: 10 * 1000 * 1000 }); this.type = StreamType.Arbitrary; this.url = dash_url; this.base_url = ''; - this.stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 }); this.segments_urls = []; this.packet_count = 0; this.request = null; @@ -36,7 +35,7 @@ export class LiveStreaming { this.dash_timer = setTimeout(() => { this.dash_updater(); }, 1800000); - this.stream.on('close', () => { + this.on('close', () => { this.cleanup(); }); this.start(); @@ -72,7 +71,7 @@ export class LiveStreaming { private cleanup() { clearTimeout(this.timer as NodeJS.Timer); clearTimeout(this.dash_timer as NodeJS.Timer); - this.request?.unpipe(this.stream); + this.request?.unpipe(this); this.request?.destroy(); this.dash_timer = null; this.video_url = ''; @@ -86,7 +85,7 @@ export class LiveStreaming { } private async start() { - if (this.stream.destroyed) { + if (this.destroyed) { this.cleanup(); return; } @@ -100,17 +99,17 @@ export class LiveStreaming { await new Promise(async (resolve, reject) => { const stream = await request_stream(this.base_url + segment).catch((err: Error) => err); if (stream instanceof Error) { - this.stream.emit('error', stream); + this.emit('error', stream); return; } this.request = stream; - stream.pipe(this.stream, { end: false }); + stream.pipe(this, { end: false }); stream.on('end', () => { this.packet_count++; resolve(''); }); stream.once('error', (err) => { - this.stream.emit('error', err); + this.emit('error', err); }); }); } @@ -120,9 +119,8 @@ export class LiveStreaming { } } -export class Stream { +export class Stream extends PassThrough { type: StreamType; - stream: PassThrough; private url: string; private bytes_count: number; private per_sec_bytes: number; @@ -141,9 +139,9 @@ export class Stream { video_url: string, cookie: string ) { + super({ highWaterMark: 10 * 1000 * 1000 }); this.url = url; this.type = type; - this.stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 }); this.bytes_count = 0; this.video_url = video_url; this.cookie = cookie; @@ -155,16 +153,16 @@ export class Stream { this.request = null; this.data_ended = false; this.playing_count = 0; - this.stream.on('close', () => { + this.on('close', () => { this.cleanup(); }); - this.stream.on('pause', () => { + this.on('pause', () => { this.playing_count++; if (this.data_ended) { this.bytes_count = 0; this.per_sec_bytes = 0; this.cleanup(); - this.stream.removeAllListeners('pause'); + this.removeAllListeners('pause'); } else if (this.playing_count === 280) { this.playing_count = 0; this.loop(); @@ -180,7 +178,7 @@ export class Stream { private cleanup() { clearInterval(this.timer as NodeJS.Timer); - this.request?.unpipe(this.stream); + this.request?.unpipe(this); this.request?.destroy(); this.timer = null; this.request = null; @@ -188,7 +186,7 @@ export class Stream { } private async loop() { - if (this.stream.destroyed) { + if (this.destroyed) { this.cleanup(); return; } @@ -199,7 +197,7 @@ export class Stream { } }).catch((err: Error) => err); if (stream instanceof Error) { - this.stream.emit('error', stream); + this.emit('error', stream); this.data_ended = true; this.bytes_count = 0; this.per_sec_bytes = 0; @@ -218,10 +216,10 @@ export class Stream { return; } this.request = stream; - stream.pipe(this.stream, { end: false }); + stream.pipe(this, { end: false }); stream.once('error', (err) => { - this.stream.emit('error', err); + this.emit('error', err); }); stream.on('data', (chunk: any) => {