@mdf.js/file-flinger is a robust module within the @mdf.js
ecosystem, designed to facilitate customized data file processing workflows for cold path ingestion. It provides a versatile framework for constructing file processing pipelines, enabling developers to define custom pushers to deliver data files to various destinations.
Before delving into the documentation, it is essential to understand the core concepts within @mdf.js/file-flinger
:
npm:
npm install @mdf.js/file-flinger
yarn:
yarn add @mdf.js/file-flinger
To build a file processing workflow, you need to create custom pushers that send processed files to specific destinations. A pusher is responsible for delivering files to storage systems like databases, cloud storage, or data lakes.
To create a pusher, you need to define a class that implements the Pusher
interface. This interface extends Layer.App.Resource
, which provides methods and properties for managing the resource lifecycle and health status.
When implementing a pusher, you should ensure the following:
start()
, stop()
, and close()
methods to manage the pusher's lifecycle.push(filePath: string, key: string)
method, which handles the logic to send the file to the destination using the provided file path and key.metrics
getter that returns a Prometheus Registry
containing the pusher's metrics.status
and checks
getters that return the pusher's health status and checks, which are crucial for monitoring the pusher's health.If you are using the @mdf.js
framework to create your pusher, you can integrate the pusher's health information with the provider's health information.
Here is an example of a custom pusher class:
import { EventEmitter } from 'events';
import { Pusher } from '@mdf.js/file-flinger';
import { Registry } from 'prom-client';
import { Health } from '@mdf.js/core';
// Class that implements the Pusher interface
class MyCustomPusher extends EventEmitter implements Pusher {
/** Constructor */
constructor() {
super();
}
/**
* Push the file to the storage
* @param filePath - The file path to push
* @param key - The key to use
*/
public async push(filePath: string, key: string): Promise<void> {
// Implementation of file pushing logic
}
/** Start the pusher and the underlying provider */
public async start(): Promise<void> {
// Initialization logic
}
/** Stop the pusher and the underlying provider */
public async stop(): Promise<void> {
// Graceful shutdown logic
}
/** Stop the pusher and the underlying provider and clean the resources */
public async close(): Promise<void> {
// Cleanup logic
}
/** Prometheus registry to store the metrics of the pusher */
public get metrics(): Registry {
// Return Prometheus registry with pusher metrics
return new Registry();
}
/** Pusher health status */
public get status(): Health.Status {
// Return health status
return 'pass';
}
/** Pusher health checks */
public get checks(): Health.Checks {
// Return object with health checks
return {};
}
}
The FileFlinger
class extends EventEmitter
and emits several events that you can listen to:
error
: Emitted when the component detects an error.
fileFlinger.on('error', (error) => {
console.error('An error occurred:', error);
});
status
: Emitted when the component's status changes.
fileFlinger.on('status', (status) => {
console.log('FileFlinger status:', status);
});
To instantiate a FileFlinger
, you need to provide a name and an options object that configures its behavior. The options include:
pushers
: An array of pushers that will be used to send files to their destinations.watchPath
: The path or array of paths to monitor for incoming files.filePattern
(default: undefined
): A glob pattern or custom pattern to match the files to be processed.keyPattern
(default: {_filename}
): A pattern used by the key generator (Keygen
) to create unique keys for the files.defaultValues
(default: {}
): An object containing default values for placeholders used in patterns.cwd
(default: undefined
): The base directory for relative paths.maxErrors
(default: 10
): The maximum number of errors to store in the error list.retryDelay
(default: 30000
): Delay in milliseconds between retries for failed file processing operations.archiveFolder
(default: undefined
): The directory where processed files will be moved if the post-processing strategy is archive
or zip
.deadLetterFolder
(default: undefined
): The directory where files with processing errors will be moved if the error strategy is dead-letter
.postProcessingStrategy
(default: 'delete'
): Strategy for handling files after successful processing. Options are:
'delete'
: Delete the file.'archive'
: Move the file to the archiveFolder
.'zip'
: Compress the file and move it to the archiveFolder
.errorStrategy
(default: 'delete'
): Strategy for handling files that encountered errors during processing. Options are:
'delete'
: Delete the file.'ignore'
: Leave the file as is.'dead-letter'
: Move the file to the deadLetterFolder
.retryOptions
: Configuration for retrying file operations. Includes:
attempts
(default: 3
): Number of retry attempts.maxWaitTime
(default: 60000
): Maximum total wait time in milliseconds between retries.timeout
(default: 10000
): Timeout in milliseconds for each retry attempt.waitTime
(default: 1000
): Initial wait time in milliseconds between retries, which may be increased based on a backoff strategy.Here's how to create a FileFlinger
instance with custom options:
import { FileFlinger } from '@mdf.js/file-flinger';
const fileFlinger = new FileFlinger('MyFileFlinger', {
pushers: [/* Your custom pushers */],
watchPath: '/path/to/watch',
filePattern: '{sensor}_{measurement}_{date}.jsonl',
keyPattern: '{sensor}/{measurement}/{date}',
defaultValues: {},
cwd: process.cwd(),
maxErrors: 10,
retryDelay: 30000,
archiveFolder: '/path/to/archive',
deadLetterFolder: '/path/to/dead-letter',
postProcessingStrategy: 'archive',
errorStrategy: 'dead-letter',
retryOptions: {
attempts: 3,
maxWaitTime: 60000,
timeout: 10000,
waitTime: 1000,
},
});
To manage the lifecycle of the FileFlinger
, you can use the following methods:
start(): Promise<void>
: Starts the FileFlinger
, initializing all watchers and pushers, and begins processing files as they arrive.stop(): Promise<void>
: Stops the FileFlinger
, gracefully shutting down all watchers and pushers.close(): Promise<void>
: Stops the FileFlinger
and cleans up all resources, including closing any open file handles or network connections.Example:
// Start the FileFlinger
await fileFlinger.start();
// The FileFlinger is now monitoring for files and processing them.
// When you need to stop the FileFlinger
await fileFlinger.stop();
// If you want to completely close and clean up resources
await fileFlinger.close();
The FileFlinger
class includes a Prometheus Registry
to store metrics related to the file processing pipeline. These metrics can be used to monitor the performance and health of the system.
Default metrics included in the FileFlinger
are:
api_all_job_processed_total
: The total number of jobs processed, labeled by type
.api_all_errors_job_processing_total
: The total number of errors encountered during job processing, labeled by type
.api_all_job_in_processing_total
: The number of jobs currently being processed, labeled by type
.api_publishing_job_duration_milliseconds
: The duration of file processing jobs in milliseconds, labeled by type
.The type
label typically represents the key generated for the file, allowing you to categorize metrics by file type or other meaningful identifiers.
Pushers should also provide metrics and health information. They should implement the metrics
, status
, and checks
properties:
metrics
: Returns a Prometheus Registry
with the pusher's metrics.status
: Returns the health status of the pusher ('pass'
or 'fail'
).checks
: Returns an object containing detailed health checks for the pusher.You can access the FileFlinger
's metrics and health information:
// Access metrics
const metricsRegistry = fileFlinger.metrics;
// Access health status and checks
const fileFlingerStatus = fileFlinger.status;
const fileFlingerChecks = fileFlinger.checks;
The Keygen
utility is responsible for generating unique and meaningful identifiers (keys) for processed files. These keys are used to identify and track files within the system and are crucial for organizing data in storage destinations.
The key generation process involves:
filePattern
.keyPattern
with actual values from the merged placeholders to produce the final key.Placeholders are enclosed in curly braces {}
and are used in both the filePattern
and keyPattern
. They are replaced with actual values during key generation.
The following placeholders are available by default:
{_filename}
: The base name of the file without its extension.{_extension}
: The file extension (including the dot), e.g., .jsonl
.{_timestamp}
: The current timestamp in milliseconds since the Unix epoch.{_date}
: The current date in YYYY-MM-DD
format.{_time}
: The current time in HH:mm:ss
format.{_datetime}
: The current date and time in YYYY-MM-DD_HH-mm-ss
format.{_year}
: The current year as a four-digit number.{_month}
: The current month as a two-digit number (01-12).{_day}
: The current day of the month as a two-digit number (01-31).{_hour}
: The current hour as a two-digit number (00-23).{_minute}
: The current minute as a two-digit number (00-59).{_second}
: The current second as a two-digit number (00-59).You can define custom placeholders by specifying them in the filePattern
. These placeholders extract corresponding values from the file name.
Example:
sensor1_temperature_2023-10-24.jsonl
{sensor}_{measurement}_{date}.jsonl
sensor
, measurement
, date
The Keygen
class accepts an options
object to customize its behavior:
filePattern
: A pattern used to parse the file name and extract placeholders.keyPattern
: A pattern used to generate the key by replacing placeholders with actual values.defaultValues
: An object containing default values for placeholders that may not be present in the file name.Default Options:
const DEFAULT_KEY_GEN_OPTIONS: Required<KeygenOptions> = {
filePattern: '*', // Matches any file name
keyPattern: '{_filename}', // Uses the file name without extension as the key
defaultValues: {}, // No default values provided
};
Description: Generate a key using default settings.
# filePattern: undefined
keyPattern: '{_filename}'
# defaultValues: {}
myfile.txt
myfile
Explanation:
filePattern
is undefined
, any file name matches.keyPattern
{_filename}
uses the file name without the extension.'myfile'
.Description: Generate a key by extracting custom placeholders from the file name.
filePattern: '{sensor}_{measurement}_{date}.jsonl'
keyPattern: '{sensor}/{measurement}/{date}'
# defaultValues: {}
sensor1_temperature_2023-10-24.jsonl
sensor1/temperature/2023-10-24
Explanation:
filePattern
extracts sensor
, measurement
, and date
from the file name.keyPattern
constructs the key using these placeholders.Description: Use default values for placeholders not present in the file name.
filePattern: '{sensor}_{measurement}_{date}.jsonl'
keyPattern: '{sensor}/{measurement}/{date}/{location}'
defaultValues:
location: 'defaultLocation'
sensor1_temperature_2023-10-24.jsonl
sensor1/temperature/2023-10-24/defaultLocation
Explanation:
location
placeholder is not present in the file name.defaultValues
provide a value for location
.Description: Generate a key that includes current date components.
filePattern: '{sensor}_{measurement}.jsonl'
keyPattern: '{sensor}/{measurement}/{_year}/{_month}/{_day}'
# defaultValues: {}
sensor1_temperature.jsonl
sensor1/temperature/2024/11/03
Explanation:
{_year}
, {_month}
, {_day}
are replaced with the current date components.Description: Generate a key using complex file patterns and default values.
filePattern: '{sensor}_{measurement}_{year}-{month}-{day}_{end}.jsonl'
keyPattern: '{sensor}/{measurement}/{year}/{month}/{day}/data_{source}'
defaultValues:
source: 'myFileFlinger1'
mySensor_flowMeter1_2024-12-30_2024-12-31.jsonl
mySensor/flowMeter1/2024/12/30/data_myFileFlinger1
Explanation:
sensor
, measurement
, year
, month
, day
, and end
are extracted from the file name.source
placeholder is provided via defaultValues
.During key generation, some errors can occur. These errors are emitted as error
events and can be handled by listening to the FileFlinger
's error
event.
Filename Does Not Match Pattern: If the file name does not match the filePattern
, an error is emitted.
Error Message: 'Filename [invalid_filename.jsonl] does not match the pattern [{sensor}_{measurement}_{date}.jsonl]'
Placeholder Not Found in Values: If a placeholder in the keyPattern
is not found in the merged placeholders, an error is emitted.
Error Message: 'Error generating a key based on pattern [{sensor}/{measurement}/{date}/{unknown}] for file [sensor1_temperature_2023-10-24.jsonl]: Placeholder [unknown] not found in values'
filePattern
and keyPattern
: Explicitly specify these patterns to ensure keys are generated as expected.keyPattern
are either extracted from the file name, provided in defaultValues
, or are predefined placeholders.Copyright 2024 Mytra Control S.L. All rights reserved.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at https://opensource.org/licenses/MIT.