---
title: Cloud storage and filesystem
description: dlt-verified source for reading files from cloud storage and local file system
keywords: [file system, files, filesystem, readers source, cloud storage, object storage, local file system]
---


The filesystem source allows seamless loading of files from the following locations:
* AWS S3
* Google Cloud Storage
* Google Drive
* Azure Blob Storage
* remote filesystem (via SFTP)
* local filesystem
* Public CDN

The filesystem source natively supports [CSV](../../file-formats/csv.md), [Parquet](../../file-formats/parquet.md), and [JSONL](../../file-formats/jsonl.md) files and allows customization for loading any type of structured file.

Filesystem source allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure Blob Storage, SFTP server) or the local filesystem seamlessly. Filesystem source natively supports [CSV](../../file-formats/csv.md), [Parquet](../../file-formats/parquet.md), and [JSONL](../../file-formats/jsonl.md) files and allows customization for loading any type of structured files.

To load unstructured data (PDF, plain text, e-mail), please refer to the [unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data).

## How filesystem source works

The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs.

Filesystem source loads data in two steps:
1. It [accesses the files](#1-initialize-a-filesystem-resource) in your remote or local file storage without actually reading the content yet. At this point, you can [filter files by metadata or name](#7-filter-files). You can also set up [incremental loading](#5-incremental-loading) to load only new files.
2. [The reader](#2-choose-the-right-reader) reads the files' content and yields the records. At this step, you can filter out the actual data, enrich records with metadata from files, or [perform incremental loading](#load-new-records-based-on-a-specific-column) based on the file content.

For the most common cases we provide `readers` source that does the above in a single step.

## Quick example

Let's see how to load a parquet file from a public website. The following example downloads a single file of yellow taxi trip records from the [NYC Taxi & Limousine Commission](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) website and loads it into DuckDB.

```py
import datetime as dt

import dlt
from dlt.sources.filesystem import filesystem, read_parquet

filesystem_resource = filesystem(
  bucket_url="https://d37ci6vzurychx.cloudfront.net/trip-data",
  file_glob=f"yellow_tripdata_{(dt.datetime.now() - dt.timedelta(days=90)).strftime('%Y-%m')}.parquet",
)
filesystem_pipe = filesystem_resource | read_parquet()

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("yellow_tripdata"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```

This section illustrates how to perform an efficient incremental load of Parquet files from a remote source, specifically an S3 bucket.

```py
import dlt
from dlt.sources.filesystem import filesystem, read_parquet

filesystem_resource = filesystem(
  bucket_url="s3://my-bucket/files",
  file_glob="**/*.parquet",
  incremental=dlt.sources.incremental("modification_date")
)
filesystem_pipe = filesystem_resource | read_parquet()

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```

With `readers` source:

```py
import dlt
from dlt.sources.filesystem import readers

parquet_files = readers(
  bucket_url="s3://my-bucket/files",
  file_glob="**/*.parquet",
  incremental=dlt.sources.incremental("modification_date")
).read_parquet()

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(parquet_files.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```

## Setup

### Prerequisites

Please make sure the `dlt` library is installed. Refer to the [installation guide](../../../intro).

### Initialize the filesystem source

To get started with your data pipeline, follow these steps:

1. Enter the following command:

   ```sh
   dlt init filesystem duckdb
   ```

   The [dlt init command](../../../reference/command-line-interface) will initialize
   [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/filesystem_pipeline.py)
   with the filesystem as the source and [duckdb](../../destinations/duckdb.md) as the destination.

2. If you would like to use a different destination, simply replace `duckdb` with the name of your
   preferred [destination](../destinations).

3. After running this command, a new directory will be created with the necessary files and
   configuration settings to get started.

## Configuration

### Get credentials

<Tabs
  groupId="filesystem-type"
  defaultValue="aws"
  values={[
    {"label": "AWS S3", "value": "aws"},
    {"label": "GCS/GDrive", "value": "gcp"},
    {"label": "Azure", "value": "azure"},
    {"label": "SFTP", "value": "sftp"},
    {"label": "Local filesystem", "value": "local"},
]}>

<TabItem value="aws">

To get AWS keys for S3 access:

1. Access IAM in the AWS Console.
2. Select "Users", choose a user, and open "Security credentials".
3. Click "Create access key" for AWS ID and Secret Key.

For more info, see
[AWS official documentation.](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)

</TabItem>

<TabItem value="gcp">

To get GCS/GDrive access:

1. Log in to [console.cloud.google.com](http://console.cloud.google.com/).
2. Create a [service account](https://cloud.google.com/iam/docs/service-accounts-create#creating).
3. Enable "Cloud Storage API" / "Google Drive API"; see
   [Google's guide](https://support.google.com/googleapi/answer/6158841?hl=en).
4. In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" >
   "ADD KEY" > "CREATE" to get a JSON credential file.
5. Grant the service account appropriate permissions for cloud storage access.
6. In the case of GDrive, share the respective folders/files with the service account.

For more info, see how to
[create a service account](https://support.google.com/a/answer/7378726?hl=en).

</TabItem>

<TabItem value="azure">

To obtain Azure blob storage access:

1. Go to the Azure Portal (portal.azure.com).
2. Select "Storage accounts" > your storage.
3. Click "Settings" > "Access keys".
4. View the account name and two keys (primary/secondary). Keep keys confidential.

For more info, see
[Azure official documentation](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal).

</TabItem>

<TabItem value="sftp">

dlt supports several authentication methods:

1. Key-based authentication
2. SSH Agent-based authentication
3. Username/Password authentication
4. GSS-API authentication

Learn more about SFTP authentication options in the [SFTP section](../destinations/filesystem#sftp). To obtain credentials, contact your server administrator.
</TabItem>

<TabItem value="local">
You don't need any credentials for the local filesystem.
</TabItem>

</Tabs>

### Add credentials to dlt pipeline

To provide credentials to the filesystem source, you can use [any method available](../../../general-usage/credentials/setup) in dlt.
One of the easiest ways is to use configuration files. The `.dlt` folder in your working directory contains two files: `config.toml` and `secrets.toml`. Sensitive information, like passwords and access tokens, should only be put into `secrets.toml`, while any other configuration, like the path to a bucket, can be specified in `config.toml`.

<Tabs
  groupId="filesystem-type"
  defaultValue="aws"
  values={[
    {"label": "AWS S3", "value": "aws"},
    {"label": "GCS/GDrive", "value": "gcp"},
    {"label": "Azure", "value": "azure"},
    {"label": "SFTP", "value": "sftp"},
    {"label": "Local filesystem", "value": "local"},
]}>

<TabItem value="aws">

```toml
# secrets.toml
[sources.filesystem.credentials]
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"

# config.toml
[sources.filesystem]
bucket_url="s3://<bucket_name>/<path_to_files>/"
```
</TabItem>

<TabItem value="azure">

```toml
# secrets.toml
[sources.filesystem.credentials]
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"

# config.toml
[sources.filesystem]
bucket_url="az://<container_name>/<path_to_files>/"
```
</TabItem>

<TabItem value="gcp">

```toml
# secrets.toml
[sources.filesystem.credentials]
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"

# config.toml
# gdrive
[gdrive_pipeline_name.sources.filesystem]
bucket_url="gdrive://<folder_name>/<subfolder_or_file_path>/" # set file_glob="" if file path

# config.toml
# Google storage
[gstorage_pipeline_name.sources.filesystem]
bucket_url="gs://<bucket_name>/<path_to_files>/"
```
</TabItem>

<TabItem value="sftp">

Learn how to set up SFTP credentials for each authentication method in the [SFTP section](../destinations/filesystem#sftp).
For example, in the case of key-based authentication, you can configure the source the following way:

```toml
# secrets.toml
[sources.filesystem.credentials]
sftp_username = "foo"
sftp_key_filename = "/path/to/id_rsa"     # Replace with the path to your private key file
sftp_key_passphrase = "your_passphrase"   # Optional: passphrase for your private key

# config.toml
[sources.filesystem]
bucket_url = "sftp://[hostname]/[path]"
```
</TabItem>

<TabItem value="local">

You can use both native local filesystem paths and the `file://` URI. Absolute, relative, and UNC Windows paths are supported.

You could provide an absolute filepath:

```toml
# config.toml
[sources.filesystem]
bucket_url='file://Users/admin/Documents/csv_files'
```

Or skip the schema and provide the local path in a format native to your operating system. For example, for Windows:

```toml
[sources.filesystem]
bucket_url='~\Documents\csv_files\'
```

</TabItem>

</Tabs>

You can also specify the credentials using environment variables. The name of the corresponding environment variable should be slightly different from the corresponding name in the TOML file. Simply replace dots `.` with double underscores `__`:

```sh
export SOURCES__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID = "Please set me up!"
export SOURCES__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY = "Please set me up!"
```

:::tip
dlt supports more ways of authorizing with cloud storage, including identity-based and default credentials. To learn more about adding credentials to your pipeline, please refer to the [Configuration and secrets section](../../../general-usage/credentials/complex_types#gcp-credentials).
:::

## Usage

The filesystem source is quite unique since it provides you with building blocks for loading data from files. First, it iterates over files in the storage and then processes each file to yield the records. Usually, you need two resources:

1. The `filesystem` resource enumerates files in a selected bucket using a glob pattern, returning details as `FileItem` in customizable page sizes.
2. One of the available transformer resources to process each file in a specific transforming function and yield the records.

### 1. Initialize a `filesystem` resource

:::note
If you use just the `filesystem` resource, it will only list files in the storage based on glob parameters and yield the files [metadata](#fileitem-fields). The `filesystem` resource itself does not read or copy files.
:::

All parameters of the resource can be specified directly in code:
```py
from dlt.sources.filesystem import filesystem

filesystem_source = filesystem(
  bucket_url="file://Users/admin/Documents/csv_files",
  file_glob="*.csv"
)
```
or taken from the config:

* python code:

  ```py
  from dlt.sources.filesystem import filesystem

  filesystem_source = filesystem()
  ```

* configuration file:
  ```toml
  [sources.filesystem]
  bucket_url="file://Users/admin/Documents/csv_files"
  file_glob="*.csv"
  ```

Full list of `filesystem` resource parameters:

* `bucket_url` - full URL of the bucket (could be a relative path in the case of the local filesystem).
* `credentials` - cloud storage credentials of `AbstractFilesystem` instance (should be empty for the local filesystem). We recommend not specifying this parameter in the code, but putting it in a secrets file instead.
* `file_glob` -  file filter in glob format. Defaults to listing all non-recursive files in the bucket URL. 

  :::info
  If the `bucket_url` is a specific file path, set `file_glob=""`.
  :::

* `files_per_page` - number of files processed at once. The default value is `100`.
* `extract_content` - if true, the content of the file will be read and returned in the resource. The default value is `False`.

### 2. Choose the right reader

The current implementation of the filesystem source natively supports three file types: CSV, Parquet, and JSONL.
You can apply any of the above or [create your own readers](#create-your-own-readers). To apply the selected transformer resource, use pipe notation `|`:

```py
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(
  bucket_url="file://Users/admin/Documents/csv_files",
  file_glob="*.csv"
) | read_csv()
```

#### Available readers

- `read_csv()` - processes CSV files using [Pandas](https://pandas.pydata.org/). Control batch size with `chunksize` (defaults to 10000 rows). Accepts additional `**pandas_kwargs` passed to `pd.read_csv()`.
- `read_jsonl()` - processes JSONL files chunk by chunk. Control batch size with `chunksize` (defaults to 1000 lines per batch).
- `read_parquet()` - processes Parquet files using [PyArrow](https://arrow.apache.org/docs/python/). Control memory usage with `chunksize` (defaults to 1000 rows per batch). Set `use_pyarrow=True` to yield native `pyarrow.RecordBatch` objects instead of Python dictionaries for zero-copy operations.
- `read_csv_duckdb()` - processes CSV files using DuckDB, which usually shows better performance than Pandas. Control batch size with `chunk_size` (defaults to 5000 rows). Set `use_pyarrow=True` to yield Arrow format instead of JSON. Accepts additional `**duckdb_kwargs` passed to DuckDB's `read_csv()`.

:::tip
We advise that you give each resource a [specific name](../../../general-usage/resource#duplicate-and-rename-resources) before loading with `pipeline.run`. This will ensure that data goes to a table with the name you want and that each pipeline uses a [separate state for incremental loading.](../../../general-usage/state#read-and-write-pipeline-state-in-a-resource)
:::

### 3. Create and run a pipeline

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
info = pipeline.run(filesystem_pipe)
print(info)
```

For more information on how to create and run the pipeline, read the [Walkthrough: Run a pipeline](../../../walkthroughs/run-a-pipeline).

### 4. Apply hints

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
# Tell dlt to merge on date
filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date")

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
```

### 5. Incremental loading

Here are a few simple ways to load your data incrementally:

1. [Load files based on modification date](#load-files-based-on-modification-date). Only load files that have been updated since the last time `dlt` processed them. `dlt` checks the files' metadata (like the modification date) and skips those that haven't changed.
2. [Load new records based on a specific column](#load-new-records-based-on-a-specific-column). You can load only the new or updated records by looking at a specific column, like `updated_at`. Unlike the first method, this approach would read all files every time and then filter the records which were updated.
3. [Combine loading only updated files and records](#combine-loading-only-updated-files-and-records). Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but also the modified records.

#### Load files based on modification date
For example, to load only new CSV files with [incremental loading](../../../general-usage/incremental-loading), you can use the `apply_hints` method.

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# This configuration will only consider new CSV files
new_files = filesystem(
  bucket_url="s3://bucket_name",
  file_glob="directory/*.csv",
  incremental=dlt.sources.incremental("modification_date")
)

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
```

#### Load new records based on a specific column

In this example, we load only new records based on the field called `updated_at`. This method may be useful if you are not able to
filter files by modification date because, for example, all files are modified each time a new record appears.

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# We consider all CSV files
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")

# But filter out only updated records
filesystem_pipe = (all_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

#### Combine loading only updated files and records

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# This configuration will only consider modified CSV files
new_files = filesystem(
  bucket_url="s3://bucket_name",
  file_glob="directory/*.csv", incremental=dlt.sources.incremental("modification_date")
)

# And in each modified file, we filter out only updated records
filesystem_pipe = (new_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

### 6. Split large incremental loads
If you have many files to process or they are large you may choose to split pipeline runs into smaller chunks (where single file is the smallest). There are
two methods to do that:
* **Partitioning** where you split source data in several ranges, load them (possibly in parallel) and then continue to load data incrementally.
* **Split** where you load data sequentially in small chunks

**Partitioning works as follows:**

1. Obtain a list of files i.e. by just listing your resource `files = list(filesystem(...))`
2. Order your list by `modification_date` or `file_url` and split it into equal chunks.
3. For each chunk find min and max of the range
4. Use [incremental with `end_value`](../../../general-usage/incremental/cursor.md#using-end_value-for-backfill) for backfill. 
5. You can load each partition in a loop or in parallel (i.e. in separate process).
6. Continue regular incremental loading with `initial_value` set to the value at the end of the range (`modification_date` or `file_url`)
and make the start range open to avoid duplicates.
```py
import dlt
from dlt.sources.filesystem import filesystem

# list and sort all csv files for deterministic partitioning
fs_ = filesystem(bucket_url=bucket_url, file_glob="**/*.csv")
# we assume that file paths are named so files added later in time come at the end when sorted
file_urls = sorted([file["file_url"] for file in fs_])

pipeline = dlt.pipeline("test_partitioned_load", destination="duckdb")

# load each partition using initial_value and end_value
for i in range(len(file_urls) // 4 + 1):
    files_range = file_urls[i * 4 : (i + 1) * 4]
    if not files_range:
        continue

    # close both ranges to load inclusively
    file_name_incremental = dlt.sources.incremental(
        "file_url",
        initial_value=files_range[0],
        end_value=files_range[-1],
        range_start="closed",
        range_end="closed",
    )
    file_resource = filesystem(
        bucket_url=bucket_url, file_glob="**/*.csv", incremental=file_name_incremental
    ).with_name("files")
    load_info = pipeline.run(file_resource)
    print(load_info)

# note we could also extract max modification_time and use it for subsequent incremental loading
file_name_incremental = dlt.sources.incremental(
    "file_url",
    initial_value=file_urls[-1],
    range_start="open",
)
file_resource = filesystem(
    bucket_url=bucket_url, file_glob="**/*.csv", incremental=file_name_incremental
).with_name("files")
# will write initial incremental state
pipeline.run(file_resource)
```

Please read [notes on parallelism](../../../general-usage/incremental/cursor.md#partition-large-backfills)

**Split loading works as follows:**

1. Use `incremental` property with **row_order** set. 
2. Limit number of files returned per page when creating `filesystem` instance to get manageable chunks
3. Limit the resource by number of pages or time
4. Run pipeline in a loop as long as it is not empty

```py
import dlt
from dlt.sources.filesystem import filesystem

# return files in order of modification_date
incremental_ = dlt.sources.incremental("modification_date", row_order="asc")  # type: ignore
# each page contains only one file
fs_ = filesystem(bucket_url=bucket_url, file_glob="csv/*", incremental=incremental_, files_per_page=1)

# process one file in each run, you could also use max_time to process files i.e. for an hour
while not pipeline.run(fs_.with_name("files").add_limit(1)).is_empty:
    print(pipeline.last_trace.last_load_info)
```
**Note that you must set row_order on incremental to not miss a file**:

### 7. Filter files

If you need to filter out files based on their metadata, you can easily do this using the `add_filter` method.
Within your filtering function, you'll have access to [any field](#fileitem-fields) of the `FileItem` representation.

#### Filter by name
To filter only files that have `London` and `Berlin` in their names, you can do the following:
```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# Filter files accessing file_name field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: ("London" in item["file_name"]) or ("Berlin" in item["file_name"]))

filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

:::tip
You could also use `file_glob` to filter files by names. It works very well in simple cases, for example, filtering by extension:

```py
from dlt.sources.filesystem import filesystem

filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="**/*.json")
```
:::

#### Filter by size

If for some reason you only want to load small files, you can also do that:

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

MAX_SIZE_IN_BYTES = 10

# Filter files accessing size_in_bytes field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: item["size_in_bytes"] < MAX_SIZE_IN_BYTES)

filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

## Standalone filesystem resource

You can use the [standalone filesystem](../../../general-usage/resource#declare-a-standalone-resource) resource to list files in cloud storage or a local filesystem. This allows you to customize file readers or manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html).

```py
from dlt.sources.filesystem import filesystem

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv")
pipeline.run(files)
```

The filesystem ensures consistent file representation across bucket types and offers methods to access and read data. You can quickly build pipelines to:

- Extract text from PDFs ([unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data)).
- Stream large file content directly from buckets.
- Copy files locally ([copy files](#copy-files-locally))

### `FileItem` representation

- All dlt sources/resources that yield files follow the [FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L40) contract.
- File content is typically not loaded (you can control it with the `extract_content` parameter of the filesystem resource). Instead, full file info and methods to access content are available.
- Users can request an authenticated [fsspec AbstractFileSystem](https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/spec.html#AbstractFileSystem) instance.

#### `FileItem` fields

- `file_url` - complete URL of the file (e.g., `s3://bucket-name/path/file`). This field serves as a primary key.
- `file_name` - name of the file from the bucket URL.
- `relative_path` - set when doing `glob`, is a relative path to a `bucket_url` argument.
- `mime_type` - file's MIME type. It is sourced from the bucket provider or inferred from its extension.
- `modification_date` - file's last modification time (format: `pendulum.DateTime`).
- `size_in_bytes` - file size.
- `file_content` - content, provided upon request.

:::info
When using a nested or recursive glob pattern, `relative_path` will include the file's path relative to `bucket_url`. For instance, using the resource: `filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")` will produce file names relative to the `/standard_source/samples` path, such as `met_csv/A801/A881_20230920.csv`. For local filesystems, POSIX paths (using "/" as separator) are returned.
:::

### File manipulation

[FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L40), backed by a dictionary implementation, offers these helpers:

- `read_bytes()` - method, which returns the file content as bytes.
- `open()` - method which provides a file object when opened.
- `filesystem` - field, which gives access to authorized `AbstractFilesystem` with standard fsspec methods.

## Create your own readers

Although the `filesystem` resource yields the files from cloud storage or a local filesystem, you need to apply a transformer resource to retrieve the records from files. dlt natively supports three file types: [CSV](../../file-formats/csv.md), [Parquet](../../file-formats/parquet.md), and [JSONL](../../file-formats/jsonl.md) (more details in [filesystem transformer resource](#2-choose-the-right-reader)).

But you can easily create your own. In order to do this, you just need a function that takes as input a `FileItemDict` iterator and yields a list of records (recommended for performance) or individual records.

### Example: read data from Excel files

The code below sets up a pipeline that reads from an Excel file using a standalone transformer:

```py
from typing import Iterator

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItem
from dlt.sources.filesystem import filesystem

BUCKET_URL = "s3://my_bucket/data"

# Define a standalone transformer to read data from an Excel file.
@dlt.transformer
def read_excel(
    items: Iterator[FileItemDict], sheet_name: str
) -> Iterator[TDataItems]:
    # Import the required pandas library.
    import pandas as pd

    # Iterate through each file item.
    for file_obj in items:
        # Open the file object.
        with file_obj.open() as file:
            # Read from the Excel file and yield its content as dictionary records.
            yield pd.read_excel(file, sheet_name).to_dict(orient="records")

# Set up the pipeline to fetch a specific Excel file from a filesystem (bucket).
example_xls = filesystem(
    bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx"
) | read_excel("example_table")   # Pass the data through the transformer to read the "example_table" sheet.

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xls_data")
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = pipeline.run(example_xls.with_name("example_xls_data"))
# Print the loading information.
print(load_info)
```

### Example: read data from XML files

You can use any third-party library to parse an `xml` file (e.g., [BeautifulSoup](https://pypi.org/project/beautifulsoup4/), [pandas](https://pandas.pydata.org/docs/reference/api/pandas.read_xml.html)). In the following example, we will be using the [xmltodict](https://pypi.org/project/xmltodict/) Python library.

```py
import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItems
from dlt.sources.filesystem import filesystem

BUCKET_URL = "s3://my_bucket/data"

# Define a standalone transformer to read data from an XML file.
@dlt.transformer
def read_xml(items: Iterator[FileItemDict]) -> Iterator[TDataItems]:
    # Import the required xmltodict library.
    import xmltodict

    # Iterate through each file item.
    for file_obj in items:
        # Open the file object.
        with file_obj.open() as file:
            # Parse the file to dict records.
            yield xmltodict.parse(file.read())

# Set up the pipeline to fetch a specific XML file from a filesystem (bucket).
example_xml = filesystem(
    bucket_url=BUCKET_URL, file_glob="../directory/example.xml"
) | read_xml()   # Pass the data through the transformer

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xml_data")
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = pipeline.run(example_xml.with_name("example_xml_data"))

# Print the loading information.
print(load_info)
```

## Clean files after loading

You can get an fsspec client from the filesystem resource after it was extracted, i.e., in order to delete processed files, etc. The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows:

```py
from dlt.sources.filesystem import filesystem, read_csv
from dlt.sources.filesystem.helpers import fsspec_from_resource

# Get filesystem source.
gs_resource = filesystem("gs://ci-test-bucket/")
# Extract files.
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
pipeline.run(gs_resource | read_csv())
# Get fs client.
fs_client = fsspec_from_resource(gs_resource)
# Do any operation.
fs_client.ls("ci-test-bucket/standard_source/samples")
```

## Copy files locally

To copy files locally, add a step in the filesystem resource and then load the listing to the database:

```py
import os

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.sources.filesystem import filesystem

def _copy(item: FileItemDict) -> FileItemDict:
    # Instantiate fsspec and copy file
    dest_file = os.path.join("./local_folder", item["file_name"])
    # Create destination folder
    os.makedirs(os.path.dirname(dest_file), exist_ok=True)
    # Download file
    item.fsspec.download(item["file_url"], dest_file)
    # Return file item unchanged
    return item

BUCKET_URL = "gs://ci-test-bucket/"

# Use recursive glob pattern and add file copy step
downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy)

# NOTE: You do not need to load any data to execute extract; below, we obtain
# a list of files in a bucket and also copy them locally
listing = list(downloader)
print(listing)
# Download to table "listing"
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(
    downloader.with_name("listing"), write_disposition="replace"
)
# Pretty print the information on data that was loaded
print(load_info)
print(listing)
print(pipeline.last_trace.last_normalize_info)
```


## Troubleshoot

### Access extremely long file paths

Windows supports paths up to 255 characters. When you access a path longer than 255 characters, you'll see a `FileNotFound` exception.

To go over this limit, you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry).
**Note that Python glob does not work with extended UNC paths**, so you will not be able to use them

```toml
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
```

### If you get an empty list of files

If you are running a dlt pipeline with the filesystem source and get zero records, we recommend you check
the configuration of `bucket_url` and `file_glob` parameters.

For example, with Azure Blob Storage, people sometimes mistake the account name for the container name. Make sure you've set up a URL as `"az://<container name>/"`.

Also, please reference the [glob](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.glob) function to configure the resource correctly. Use `**` to include recursive files. Note that the local filesystem supports full Python [glob](https://docs.python.org/3/library/glob.html#glob.glob) functionality, while cloud storage supports a restricted `fsspec` [version](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.glob).

## Additional Setup guides
- [Load data from Box Platform API to Azure Cloud Storage in python with dlt](https://dlthub.com/docs/pipelines/box/load-data-with-python-from-box-to-filesystem-az)
- [Load data from ClickHouse Cloud to The Local Filesystem in python with dlt](https://dlthub.com/docs/pipelines/clickhouse_cloud/load-data-with-python-from-clickhouse_cloud-to-filesystem-local)
- [Load data from Clubhouse to The Local Filesystem in python with dlt](https://dlthub.com/docs/pipelines/clubhouse/load-data-with-python-from-clubhouse-to-filesystem-local)
- [Load data from Soundcloud to The Local Filesystem in python with dlt](https://dlthub.com/docs/pipelines/soundcloud/load-data-with-python-from-soundcloud-to-filesystem-local)
- [Load data from Sentry to Google Cloud Storage in python with dlt](https://dlthub.com/docs/pipelines/sentry/load-data-with-python-from-sentry-to-filesystem-gcs)
- [Load data from Azure Cloud Storage to MotherDuck in python with dlt](https://dlthub.com/docs/pipelines/filesystem-az/load-data-with-python-from-filesystem-az-to-motherduck)
- [Load data from The Local Filesystem to AWS S3 in python with dlt](https://dlthub.com/docs/pipelines/filesystem-local/load-data-with-python-from-filesystem-local-to-filesystem-aws)
- [Load data from Google Cloud Storage to Snowflake in python with dlt](https://dlthub.com/docs/pipelines/filesystem-gcs/load-data-with-python-from-filesystem-gcs-to-snowflake)
- [Load data from Klaviyo to The Local Filesystem in python with dlt](https://dlthub.com/docs/pipelines/klaviyo/load-data-with-python-from-klaviyo-to-filesystem-local)
- [Load data from AWS S3 to Google Cloud Storage in python with dlt](https://dlthub.com/docs/pipelines/filesystem-aws/load-data-with-python-from-filesystem-aws-to-filesystem-gcs)
