LiveStream update, added type making it easy for playback

This commit is contained in:
killer069 2021-08-20 23:37:30 +05:30
parent 98ccbb9671
commit f8d72f85d3
4 changed files with 123 additions and 94 deletions

View File

@ -1,5 +1,7 @@
import { PassThrough } from 'stream' import { PassThrough } from 'stream'
import got from 'got' import got from 'got'
import Request from 'got/dist/source/core';
import { StreamType } from '../stream';
export interface FormatInterface{ export interface FormatInterface{
url : string; url : string;
@ -8,56 +10,105 @@ export interface FormatInterface{
} }
export class LiveStreaming{ export class LiveStreaming{
smooth : boolean; type : StreamType
private __stream : PassThrough actual_live : boolean;
stream : PassThrough
private format : FormatInterface private format : FormatInterface
private interval : number private interval : number
private packet_count : number private packet_count : number
private timer : NodeJS.Timer | null private timer : NodeJS.Timer | null
private segments_urls : string[] private segments_urls : string[]
constructor(format : FormatInterface, smooth : boolean){ constructor(format : FormatInterface, actual_live : boolean){
this.smooth = smooth || false this.type = StreamType.Arbitrary
this.actual_live = actual_live || false
this.format = format this.format = format
this.__stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 })
this.segments_urls = [] this.segments_urls = []
this.packet_count = 0 this.packet_count = 0
this.interval = 0 this.interval = 0
this.timer = null this.timer = null
this.__stream.on('close', () => { this.stream.on('close', () => {
this.cleanup() this.cleanup()
}) });
if(this.smooth === true) this.__stream.pause() (this.actual_live) ? this.live_loop() :this.start()
this.start()
} }
async manifest_getter(){ private async live_loop(){
if(this.stream.destroyed) this.cleanup()
await this.manifest_getter()
this.segments_urls.splice(0, (this.segments_urls.length / 2))
if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0])
for await (let url of this.segments_urls){
await (async () => {
return new Promise(async (resolve, reject) => {
if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){
resolve('')
return
}
let stream = this.got_stream(url)
stream.on('data', (chunk) => this.stream.write(chunk))
stream.on('end', () => {
this.packet_count++
resolve('')
})
})
})()
}
this.interval = 1
this.timer = setTimeout(async () => {
await this.looping()
}, this.interval)
}
private async looping(){
if(this.stream.destroyed) this.cleanup()
await this.manifest_getter()
for await (let url of this.segments_urls){
await (async () => {
return new Promise(async (resolve, reject) => {
if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){
resolve('')
return
}
let stream = this.got_stream(url)
stream.on('data', (chunk) => this.stream.write(chunk))
stream.on('end', () => {
this.packet_count++
resolve('')
})
})
})()
}
this.interval = 1
this.timer = setTimeout(async () => {
await this.looping()
}, this.interval)
}
private async manifest_getter(){
let response = await got(this.format.url) let response = await got(this.format.url)
this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https')) this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https'))
} }
get stream(){
return this.__stream
}
private cleanup(){ private cleanup(){
clearInterval(this.timer as NodeJS.Timer) clearInterval(this.timer as NodeJS.Timer)
this.segments_urls = [] this.segments_urls = []
this.packet_count = 0 this.packet_count = 0
} }
async start(){ private async start(){
if(this.__stream.destroyed) this.cleanup() if(this.stream.destroyed) this.cleanup()
await this.manifest_getter() await this.manifest_getter()
if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0]) if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0])
for await (let url of this.segments_urls){ for await (let url of this.segments_urls){
await (async () => { await (async () => {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){ if(Number(url.split('index.m3u8/sq/')[1].split('/')[0]) !== this.packet_count){
resolve('') resolve('')
return return
} }
let stream = this.got_stream(url) let stream = this.got_stream(url)
stream.on('data', (chunk) => this.__stream.write(chunk)) stream.on('data', (chunk) => this.stream.write(chunk))
stream.on('end', () => { stream.on('end', () => {
this.packet_count++ this.packet_count++
resolve('') resolve('')
@ -67,10 +118,6 @@ export class LiveStreaming{
} }
this.interval = (this.segments_urls.length / 2) * 1000 this.interval = (this.segments_urls.length / 2) * 1000
this.timer = setTimeout(async () => { this.timer = setTimeout(async () => {
if(this.smooth === true){
this.__stream.resume()
this.smooth = false
}
await this.start() await this.start()
}, this.interval) }, this.interval)
} }
@ -81,16 +128,18 @@ export class LiveStreaming{
} }
export class LiveEnded{ export class LiveEnded{
private __stream : PassThrough type : StreamType
stream : PassThrough
private format : FormatInterface private format : FormatInterface
private packet_count : number private packet_count : number
private segments_urls : string[] private segments_urls : string[]
constructor(format : FormatInterface){ constructor(format : FormatInterface){
this.type = StreamType.Arbitrary
this.format = format this.format = format
this.__stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 }) this.stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 })
this.segments_urls = [] this.segments_urls = []
this.packet_count = 0 this.packet_count = 0
this.__stream.on('close', () => { this.stream.on('close', () => {
this.cleanup() this.cleanup()
}) })
this.start() this.start()
@ -101,17 +150,13 @@ export class LiveEnded{
this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https')) this.segments_urls = response.body.split('\n').filter((x) => x.startsWith('https'))
} }
get stream(){
return this.__stream
}
private cleanup(){ private cleanup(){
this.segments_urls = [] this.segments_urls = []
this.packet_count = 0 this.packet_count = 0
} }
async start(){ async start(){
if(this.__stream.destroyed) this.cleanup() if(this.stream.destroyed) this.cleanup()
await this.manifest_getter() await this.manifest_getter()
if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0]) if(this.packet_count === 0) this.packet_count = Number(this.segments_urls[0].split('index.m3u8/sq/')[1].split('/')[0])
for await (let url of this.segments_urls){ for await (let url of this.segments_urls){
@ -122,7 +167,7 @@ export class LiveEnded{
return return
} }
let stream = this.got_stream(url) let stream = this.got_stream(url)
stream.on('data', (chunk) => this.__stream.write(chunk)) stream.on('data', (chunk) => this.stream.write(chunk))
stream.on('end', () => { stream.on('end', () => {
this.packet_count++ this.packet_count++
resolve('') resolve('')
@ -137,3 +182,18 @@ export class LiveEnded{
} }
} }
export class Stream {
type : StreamType
private piping_stream : Request
private playing_stream : PassThrough
constructor(url : string, type : StreamType){
this.type = type
this.piping_stream = got.stream(url)
this.playing_stream = new PassThrough({ highWaterMark : 10 * 1000 * 1000 })
this.piping_stream.pipe(this.playing_stream)
}
get stream(){
return this.playing_stream
}
}

View File

@ -1,3 +1,3 @@
export { search } from './search' export { search } from './search'
export { stream, stream_from_info, stream_type } from './stream' export { stream, stream_from_info } from './stream'
export * from './utils' export * from './utils'

View File

@ -1,10 +1,7 @@
import got from "got"
import { video_info } from "." import { video_info } from "."
import { PassThrough } from 'stream' import { FormatInterface, LiveEnded, LiveStreaming, Stream } from "./classes/LiveStream"
import https from 'https'
import { FormatInterface, LiveEnded, LiveStreaming } from "./classes/LiveStream"
enum StreamType{ export enum StreamType{
Arbitrary = 'arbitrary', Arbitrary = 'arbitrary',
Raw = 'raw', Raw = 'raw',
OggOpus = 'ogg/opus', OggOpus = 'ogg/opus',
@ -13,7 +10,7 @@ enum StreamType{
} }
interface StreamOptions { interface StreamOptions {
smooth : boolean actual_live : boolean
} }
interface InfoData{ interface InfoData{
@ -40,79 +37,59 @@ function parseAudioFormats(formats : any[]){
return result return result
} }
export async function stream(url : string, options : StreamOptions = { smooth : false }): Promise<PassThrough>{ export async function stream(url : string, options : StreamOptions = { actual_live : false }): Promise<Stream | LiveStreaming | LiveEnded>{
let info = await video_info(url) let info = await video_info(url)
let final: any[] = []; let final: any[] = [];
let type : StreamType;
if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) { if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) {
return await live_stream(info as InfoData, options.smooth) return await live_stream(info as InfoData, options.actual_live)
} }
let audioFormat = parseAudioFormats(info.format) let audioFormat = parseAudioFormats(info.format)
let opusFormats = filterFormat(audioFormat, "opus") let opusFormats = filterFormat(audioFormat, "opus")
if(opusFormats.length === 0){ if(opusFormats.length === 0){
type = StreamType.Arbitrary
final.push(audioFormat[audioFormat.length - 1]) final.push(audioFormat[audioFormat.length - 1])
} }
else{ else{
type = StreamType.WebmOpus
final.push(opusFormats[opusFormats.length - 1]) final.push(opusFormats[opusFormats.length - 1])
} }
if(final.length === 0) final.push(info.format[info.format.length - 1]) if(final.length === 0) {
let piping_stream = got.stream(final[0].url, { type = StreamType.Arbitrary
retry : 5, final.push(info.format[info.format.length - 1])
headers: { }
'Connection': 'keep-alive',
'Accept-Encoding': '',
'Accept-Language': 'en-US,en;q=0.8',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'
},
agent : {
https : new https.Agent({ keepAlive : true })
},
http2 : true
})
let playing_stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 })
piping_stream.pipe(playing_stream) return new Stream(final[0].url, type)
return playing_stream
} }
export async function stream_from_info(info : InfoData, options : StreamOptions){ export async function stream_from_info(info : InfoData, options : StreamOptions = { actual_live : false }): Promise<Stream | LiveStreaming | LiveEnded>{
let final: any[] = []; let final: any[] = [];
let type : StreamType;
if(info.LiveStreamData.isLive === true) { if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) {
return await live_stream(info as InfoData, options.smooth) return await live_stream(info as InfoData, options.actual_live)
} }
let audioFormat = parseAudioFormats(info.format) let audioFormat = parseAudioFormats(info.format)
let opusFormats = filterFormat(audioFormat, "opus") let opusFormats = filterFormat(audioFormat, "opus")
if(opusFormats.length === 0){ if(opusFormats.length === 0){
type = StreamType.Arbitrary
final.push(audioFormat[audioFormat.length - 1]) final.push(audioFormat[audioFormat.length - 1])
} }
else{ else{
type = StreamType.WebmOpus
final.push(opusFormats[opusFormats.length - 1]) final.push(opusFormats[opusFormats.length - 1])
} }
if(final.length === 0) final.push(info.format[info.format.length - 1]) if(final.length === 0) {
let piping_stream = got.stream(final[0].url, { type = StreamType.Arbitrary
retry : 5, final.push(info.format[info.format.length - 1])
headers: { }
'Connection': 'keep-alive',
'Accept-Encoding': '',
'Accept-Language': 'en-US,en;q=0.8',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'
},
agent : {
https : new https.Agent({ keepAlive : true })
},
http2 : true
})
let playing_stream = new PassThrough({ highWaterMark: 10 * 1000 * 1000 })
piping_stream.pipe(playing_stream) return new Stream(final[0].url, type)
return playing_stream
} }
function filterFormat(formats : any[], codec : string){ function filterFormat(formats : any[], codec : string){
@ -123,12 +100,7 @@ function filterFormat(formats : any[], codec : string){
return result return result
} }
export function stream_type(info:InfoData): StreamType{ async function live_stream(info : InfoData, actual_live : boolean): Promise<LiveStreaming | LiveEnded>{
if(info.LiveStreamData.isLive === true && info.LiveStreamData.hlsManifestUrl !== null) return StreamType.Arbitrary
else return StreamType.WebmOpus
}
async function live_stream(info : InfoData, smooth : boolean): Promise<PassThrough>{
let res_144 : FormatInterface = { let res_144 : FormatInterface = {
url : '', url : '',
targetDurationSec : 0, targetDurationSec : 0,
@ -140,10 +112,10 @@ async function live_stream(info : InfoData, smooth : boolean): Promise<PassThrou
}) })
let stream : LiveStreaming | LiveEnded let stream : LiveStreaming | LiveEnded
if(info.video_details.duration === '0') { if(info.video_details.duration === '0') {
stream = new LiveStreaming((res_144.url.length !== 0) ? res_144 : info.format[info.format.length - 1], smooth) stream = new LiveStreaming((res_144.url.length !== 0) ? res_144 : info.format[info.format.length - 1], actual_live)
} }
else { else {
stream = new LiveEnded((res_144.url.length !== 0) ? res_144 : info.format[info.format.length - 1]) stream = new LiveEnded((res_144.url.length !== 0) ? res_144 : info.format[info.format.length - 1])
} }
return stream.stream return stream
} }

View File

@ -1,4 +1 @@
//This File is in testing stage, everything will change in this export { playlist_info, video_basic_info, video_info, search, stream, stream_from_info } from "./YouTube";
import { playlist_info, video_basic_info, video_info, search, stream, stream_from_info, stream_type } from "./YouTube";
export let youtube = { playlist_info, video_basic_info, video_info, search , stream, stream_from_info, stream_type}