From 02fcb61c87d6d63c0f0aa4411f7934a12fedbfa9 Mon Sep 17 00:00:00 2001 From: killer069 <65385476+killer069@users.noreply.github.com> Date: Mon, 30 Aug 2021 21:48:57 +0530 Subject: [PATCH] TCP connection fix --- play-dl/YouTube/classes/LiveStream.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/play-dl/YouTube/classes/LiveStream.ts b/play-dl/YouTube/classes/LiveStream.ts index 5f59af9..1cf5e16 100644 --- a/play-dl/YouTube/classes/LiveStream.ts +++ b/play-dl/YouTube/classes/LiveStream.ts @@ -1,6 +1,7 @@ import { PassThrough } from 'stream' import got from 'got' import { StreamType } from '../stream'; +import { Socket } from 'net' export interface FormatInterface{ url : string; @@ -17,6 +18,7 @@ export class LiveStreaming{ private packet_count : number private timer : NodeJS.Timer | null private segments_urls : string[] + private socket : Socket | null constructor(dash_url : string, target_interval : number){ this.type = StreamType.Arbitrary this.url = dash_url @@ -25,6 +27,7 @@ export class LiveStreaming{ this.segments_urls = [] this.packet_count = 0 this.timer = null + this.socket = null this.interval = target_interval * 1000 || 0 this.stream.on('close', () => { this.cleanup() @@ -44,6 +47,8 @@ export class LiveStreaming{ private cleanup(){ clearTimeout(this.timer as NodeJS.Timer) + this.socket?.destroy() + this.socket = null this.timer = null this.url = '' this.base_url = '' @@ -67,6 +72,7 @@ export class LiveStreaming{ return new Promise(async (resolve, reject) => { let stream = got.stream(this.base_url + segment) stream.on('data', (chunk: any) => this.stream.write(chunk)) + stream.once('data', () => {this.socket = stream.socket as Socket}) stream.on('end', () => { this.packet_count++ resolve('') @@ -87,6 +93,7 @@ export class LiveEnded{ private base_url : string; private packet_count : number private segments_urls : string[] + private socket : Socket | null constructor(dash_url : string){ this.type = StreamType.Arbitrary this.url = dash_url @@ -94,6 +101,7 @@ export class LiveEnded{ this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.segments_urls = [] this.packet_count = 0 + this.socket = null this.stream.on('close', () => { this.cleanup() }) @@ -111,6 +119,8 @@ export class LiveEnded{ } private cleanup(){ + this.socket?.destroy() + this.socket = null this.url = '' this.base_url = '' this.segments_urls = [] @@ -136,6 +146,7 @@ export class LiveEnded{ return new Promise(async (resolve, reject) => { let stream = got.stream(this.base_url + segment) stream.on('data', (chunk: any) => this.stream.write(chunk)) + stream.once('data', () => {this.socket = stream.socket as Socket}) stream.on('end', () => { this.packet_count++ resolve('') @@ -154,6 +165,7 @@ export class Stream { private per_sec_bytes : number; private duration : number; private timer : NodeJS.Timer | null + private socket : Socket | null constructor(url : string, type : StreamType, duration : number){ this.url = url this.type = type @@ -161,12 +173,15 @@ export class Stream { this.bytes_count = 0 this.per_sec_bytes = 0 this.timer = null + this.socket = null this.duration = duration; (duration > 300) ? this.loop_start() : this.normal_start() } private cleanup(){ clearTimeout(this.timer as NodeJS.Timer) + this.socket?.destroy() + this.socket = null this.timer = null this.url = '' this.bytes_count = 0 @@ -180,6 +195,7 @@ export class Stream { } let stream = got.stream(this.url) stream.pipe(this.stream) + stream.once('data', () => {this.socket = stream.socket as Socket}) } private loop_start(){ @@ -190,6 +206,7 @@ export class Stream { let stream = got.stream(this.url) stream.once('data', () => { this.per_sec_bytes = Math.ceil((stream.downloadProgress.total as number)/this.duration) + this.socket = stream.socket as Socket }) stream.on('data', (chunk: any) => { @@ -224,6 +241,7 @@ export class Stream { this.bytes_count += chunk.length this.stream.write(chunk) }) + stream.once('data', () => {this.socket = stream.socket as Socket}) stream.on('data', () => { if(absolute_bytes > (this.per_sec_bytes * 300)){ stream.destroy()