Skip to content

Commit c34c006

Browse files
committed
fix: add support for streaming input data into ffmpeg
This is used when source is ftps/sftp, which is not natively supported by ffmpeg
1 parent 8776cff commit c34c006

4 files changed

Lines changed: 129 additions & 30 deletions

File tree

shared/packages/worker/src/worker/workers/genericWorker/expectationHandlers/lib/ffmpeg.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export async function spawnFFMpeg<Metadata>(
3737
onDone: () => Promise<void>,
3838
onFail: (err?: any) => Promise<void>,
3939
onProgress?: (progress: number) => Promise<void>,
40+
onStart?: (ffMpegProcess: ChildProcessWithoutNullStreams) => void,
4041
log?: (str: string) => void
4142
): Promise<FFMpegProcess> {
4243
let FFMpegIsDone = false
@@ -63,8 +64,8 @@ export async function spawnFFMpeg<Metadata>(
6364
pipeStdOut = true
6465
args.push('pipe:1') // pipe output to stdout
6566
} else if (isFTPAccessorHandle(targetHandle)) {
66-
if (targetHandle.ftpUrl.url.startsWith('ftps://')) {
67-
// ffmpeg doesn't support ftps
67+
if (targetHandle.ftpUrl.url.startsWith('ftps://') || targetHandle.ftpUrl.url.startsWith('sftp://')) {
68+
// ffmpeg doesn't support ftps protocol, stream instead
6869
pipeStdOut = true
6970
args.push('pipe:1') // pipe output to stdout
7071
} else {
@@ -91,6 +92,7 @@ export async function spawnFFMpeg<Metadata>(
9192
}
9293
ffMpegProcess = undefined
9394
}
95+
onStart?.(ffMpegProcess)
9496

9597
if (pipeStdOut) {
9698
log?.('ffmpeg: pipeStdOut')

shared/packages/worker/src/worker/workers/genericWorker/expectationHandlers/lib/scan.ts

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
AccessorOnPackage,
88
LoggerInstance,
99
escapeFilePath,
10+
stringifyError,
1011
} from '@sofie-package-manager/api'
1112
import {
1213
isQuantelClipAccessorHandle,
@@ -33,7 +34,7 @@ import { HTTPProxyAccessorHandle } from '../../../../accessorHandlers/httpProxy'
3334
import { HTTPAccessorHandle } from '../../../../accessorHandlers/http'
3435
import { MAX_EXEC_BUFFER } from '../../../../lib/lib'
3536
import { getFFMpegExecutable, getFFProbeExecutable } from './ffmpeg'
36-
import { GenericAccessorHandle } from '../../../../accessorHandlers/genericHandle'
37+
import { GenericAccessorHandle, PackageReadStream } from '../../../../accessorHandlers/genericHandle'
3738
import { FTPAccessorHandle } from '../../../../accessorHandlers/ftp'
3839

3940
export interface FFProbeScanResultStream {
@@ -68,6 +69,7 @@ export function scanWithFFProbe(
6869
) {
6970
let inputPath: string
7071
let filePath: string
72+
let pipeStdin = false
7173
if (isLocalFolderAccessorHandle(sourceHandle)) {
7274
inputPath = sourceHandle.fullPath
7375
filePath = sourceHandle.filePath
@@ -82,8 +84,14 @@ export function scanWithFFProbe(
8284
inputPath = sourceHandle.fullUrl
8385
filePath = sourceHandle.filePath
8486
} else if (isFTPAccessorHandle(sourceHandle)) {
85-
inputPath = sourceHandle.ftpUrl.url
86-
filePath = sourceHandle.filePath
87+
if (sourceHandle.ftpUrl.url.startsWith('ftps://') || sourceHandle.ftpUrl.url.startsWith('sftp://')) {
88+
// ffmpeg doesn't support ftps protocol, stream instead
89+
pipeStdin = true
90+
inputPath = '-' // stream on stdin
91+
} else {
92+
inputPath = sourceHandle.ftpUrl.url
93+
filePath = sourceHandle.filePath
94+
}
8795
} else {
8896
assertNever(sourceHandle)
8997
throw new Error('Unknown handle')
@@ -98,38 +106,71 @@ export function scanWithFFProbe(
98106
'-print_format',
99107
'json',
100108
]
109+
101110
let ffProbeProcess: ChildProcess | undefined = undefined
111+
let sourceStream: PackageReadStream | undefined = undefined
102112
onCancel(() => {
103113
ffProbeProcess?.stdin?.write('q') // send "q" to quit, because .kill() doesn't quite do it.
104114
ffProbeProcess?.kill()
115+
sourceStream?.cancel()
105116
reject('Cancelled')
106117
})
107118

108-
ffProbeProcess = execFile(
109-
getFFProbeExecutable(),
110-
args,
111-
{
112-
maxBuffer: MAX_EXEC_BUFFER,
113-
windowsVerbatimArguments: true, // To fix an issue with ffprobe.exe on Windows
114-
},
115-
(err, stdout, _stderr) => {
116-
// this.logger.debug(`Worker: metadata generate: output (stdout, stderr)`, stdout, stderr)
117-
ffProbeProcess = undefined
118-
if (err) {
119-
reject(err)
120-
return
121-
}
122-
const json: FFProbeScanResult = JSON.parse(stdout)
123-
if (!json.streams || !json.streams[0]) {
124-
reject(new Error(`File doesn't seem to be a media file`))
125-
return
126-
}
127-
json.filePath = filePath
119+
ffProbeProcess = spawn(getFFProbeExecutable(), args, {
120+
// maxBuffer: MAX_EXEC_BUFFER,
121+
windowsVerbatimArguments: true, // To fix an issue with ffprobe.exe on Windows
122+
})
123+
let stdoutBuffer = ''
124+
ffProbeProcess.stdout?.on('data', (data) => {
125+
stdoutBuffer += data.toString()
126+
})
128127

129-
fixJSONResult(json)
130-
resolve(json)
128+
ffProbeProcess.on('error', (err) => {
129+
ffProbeProcess = undefined
130+
reject(err)
131+
})
132+
ffProbeProcess.on('close', (code) => {
133+
ffProbeProcess = undefined
134+
135+
if (code !== 0) {
136+
reject(new Error(`FFProbe exited with code ${code}`))
137+
return
131138
}
132-
)
139+
140+
const json: FFProbeScanResult = JSON.parse(stdoutBuffer)
141+
if (!json.streams || !json.streams[0]) {
142+
reject(new Error(`File doesn't seem to be a media file`))
143+
return
144+
}
145+
json.filePath = filePath
146+
fixJSONResult(json)
147+
resolve(json)
148+
})
149+
if (pipeStdin) {
150+
sourceStream = await sourceHandle.getPackageReadStream()
151+
if (ffProbeProcess.stdin === null) {
152+
throw new Error('ffprobeProcess.stdin is null, cant pipe to it')
153+
}
154+
sourceStream.readStream.on('error', (err) => {
155+
// wait just a little bit before throwing, perhaps ffprobe is already done?
156+
setTimeout(() => {
157+
if (ffProbeProcess) {
158+
// still going, so this is a real error
159+
reject(new Error('Error reading source stream: ' + stringifyError(err)))
160+
}
161+
}, 10)
162+
})
163+
ffProbeProcess.stdin.on('error', (err) => {
164+
// wait just a little bit before throwing, perhaps ffprobe is already done?
165+
setTimeout(() => {
166+
if (ffProbeProcess) {
167+
// still going, so this is a real error
168+
reject(new Error('Error writing stream: ' + stringifyError(err)))
169+
}
170+
}, 10)
171+
})
172+
sourceStream.readStream.pipe(ffProbeProcess.stdin)
173+
}
133174
} else if (isQuantelClipAccessorHandle(sourceHandle)) {
134175
// Because we have no good way of using ffprobe to generate the info we want,
135176
// we resort to faking it:

shared/packages/worker/src/worker/workers/genericWorker/expectationHandlers/mediaFilePreview.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { FFMpegProcess, spawnFFMpeg } from './lib/ffmpeg'
3131
import { ExpectationHandlerGenericWorker, GenericWorker } from '../genericWorker'
3232
import { scanWithFFProbe, FFProbeScanResult } from './lib/scan'
3333
import { CancelablePromise } from '../../../lib/cancelablePromise'
34+
import { PackageReadStream } from '../../../accessorHandlers/genericHandle'
3435

3536
/**
3637
* Generates a low-res preview video of a source video file, and stores the resulting file into the target PackageContainer
@@ -212,10 +213,12 @@ export const MediaFilePreview: ExpectationHandlerGenericWorker = {
212213

213214
let ffMpegProcess: FFMpegProcess | undefined
214215
let ffProbeProcess: CancelablePromise<any> | undefined
216+
let sourceStream: PackageReadStream | undefined = undefined
215217
const workInProgress = new WorkInProgress({ workLabel: 'Generating preview' }, async () => {
216218
// On cancel
217219
ffMpegProcess?.cancel()
218220
ffProbeProcess?.cancel()
221+
sourceStream?.cancel()
219222
}).do(async () => {
220223
const tryReadPackage = await sourceHandle.checkPackageReadAccess()
221224
if (!tryReadPackage.success) throw new Error(tryReadPackage.reason.tech)
@@ -241,6 +244,7 @@ export const MediaFilePreview: ExpectationHandlerGenericWorker = {
241244
await targetHandle.removePackage('Prepare for preview generation')
242245

243246
let inputPath: string
247+
let pipeStdin = false
244248
if (isLocalFolderAccessorHandle(sourceHandle)) {
245249
inputPath = sourceHandle.fullPath
246250
} else if (isFileShareAccessorHandle(sourceHandle)) {
@@ -251,7 +255,16 @@ export const MediaFilePreview: ExpectationHandlerGenericWorker = {
251255
} else if (isHTTPProxyAccessorHandle(sourceHandle)) {
252256
inputPath = sourceHandle.fullUrl
253257
} else if (isFTPAccessorHandle(sourceHandle)) {
254-
inputPath = sourceHandle.ftpUrl.url
258+
if (
259+
sourceHandle.ftpUrl.url.startsWith('ftps://') ||
260+
sourceHandle.ftpUrl.url.startsWith('sftp://')
261+
) {
262+
// ffmpeg doesn't support ftps protocol, stream instead
263+
pipeStdin = true
264+
inputPath = '-' // stream on stdin
265+
} else {
266+
inputPath = sourceHandle.ftpUrl.url
267+
}
255268
} else {
256269
assertNever(sourceHandle)
257270
throw new Error(`Unsupported Target AccessHandler`)
@@ -309,6 +322,21 @@ export const MediaFilePreview: ExpectationHandlerGenericWorker = {
309322
},
310323
async (progress: number) => {
311324
workInProgress._reportProgress(actualSourceVersionHash, progress)
325+
},
326+
(ffmpegProcess) => {
327+
if (pipeStdin) {
328+
lookupSource.handle
329+
.getPackageReadStream()
330+
.then((stream) => {
331+
sourceStream = stream
332+
sourceStream.readStream.pipe(ffmpegProcess.stdin)
333+
})
334+
.catch((err) => {
335+
workInProgress._reportError(
336+
new Error(`FFMpeg stdin piping error: ${stringifyError(err)}`)
337+
)
338+
})
339+
}
312340
}
313341
//,worker.logger.debug
314342
)

shared/packages/worker/src/worker/workers/genericWorker/expectationHandlers/mediaFileThumbnail.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { FFMpegProcess, spawnFFMpeg } from './lib/ffmpeg'
3232
import { ExpectationHandlerGenericWorker, GenericWorker } from '../genericWorker'
3333
import { CancelablePromise } from '../../../lib/cancelablePromise'
3434
import { scanWithFFProbe, FFProbeScanResult } from './lib/scan'
35+
import { PackageReadStream } from '../../../accessorHandlers/genericHandle'
3536

3637
/**
3738
* Generates a thumbnail image from a source video file, and stores the resulting file into the target PackageContainer
@@ -178,10 +179,12 @@ export const MediaFileThumbnail: ExpectationHandlerGenericWorker = {
178179

179180
let ffMpegProcess: FFMpegProcess | undefined
180181
let ffProbeProcess: CancelablePromise<any> | undefined
182+
let sourceStream: PackageReadStream | undefined = undefined
181183
const workInProgress = new WorkInProgress({ workLabel: 'Generating thumbnail' }, async () => {
182184
// On cancel
183185
ffMpegProcess?.cancel()
184186
ffProbeProcess?.cancel()
187+
sourceStream?.cancel()
185188
}).do(async () => {
186189
if (
187190
(lookupSource.accessor.type === Accessor.AccessType.LOCAL_FOLDER ||
@@ -241,6 +244,7 @@ export const MediaFileThumbnail: ExpectationHandlerGenericWorker = {
241244
const seekTimeCode: string | undefined = seekTime !== undefined ? formatTimeCode(seekTime) : undefined
242245

243246
let inputPath: string
247+
let pipeStdin = false
244248
if (isLocalFolderAccessorHandle(sourceHandle)) {
245249
inputPath = sourceHandle.fullPath
246250
} else if (isFileShareAccessorHandle(sourceHandle)) {
@@ -251,7 +255,16 @@ export const MediaFileThumbnail: ExpectationHandlerGenericWorker = {
251255
} else if (isHTTPProxyAccessorHandle(sourceHandle)) {
252256
inputPath = sourceHandle.fullUrl
253257
} else if (isFTPAccessorHandle(sourceHandle)) {
254-
inputPath = sourceHandle.ftpUrl.url
258+
if (
259+
sourceHandle.ftpUrl.url.startsWith('ftps://') ||
260+
sourceHandle.ftpUrl.url.startsWith('sftp://')
261+
) {
262+
// ffmpeg doesn't support ftps protocol, stream instead
263+
pipeStdin = true
264+
inputPath = '-' // stream on stdin
265+
} else {
266+
inputPath = sourceHandle.ftpUrl.url
267+
}
255268
} else {
256269
assertNever(sourceHandle)
257270
throw new Error(`Unsupported Target AccessHandler`)
@@ -299,6 +312,21 @@ export const MediaFileThumbnail: ExpectationHandlerGenericWorker = {
299312
},
300313
async (progress: number) => {
301314
workInProgress._reportProgress(sourceVersionHash, progress)
315+
},
316+
(ffmpegProcess) => {
317+
if (pipeStdin) {
318+
lookupSource.handle
319+
.getPackageReadStream()
320+
.then((stream) => {
321+
sourceStream = stream
322+
sourceStream.readStream.pipe(ffmpegProcess.stdin)
323+
})
324+
.catch((err) => {
325+
workInProgress._reportError(
326+
new Error(`FFMpeg stdin piping error: ${stringifyError(err)}`)
327+
)
328+
})
329+
}
302330
}
303331
// ,worker.logger.debug
304332
)

0 commit comments

Comments
 (0)