Azure Blob Storage Inputedit

Use the azure blob storage input to read content from files stored in containers which reside on your Azure Cloud. The input can be configured to work with and without polling, though currently, if polling is disabled it will only perform a one time passthrough, list the file contents and end the process. Polling is generally recommented for most cases even though it can get expensive with dealing with a very large number of files.

To mitigate errors and ensure a stable processing environment, this input employs the following features :

  1. When processing azure blob containers, if suddenly there is any outage, the process will be able to resume post the last file it processed and was successfully able to save the state for.
  2. If any errors occur for certain files, they will be logged appropriately, but the rest of the files will continue to be processed normally.
  3. If any major error occurs which stops the main thread, the logs will be appropriately generated, describing said error.

NOTE: Currently only JSON and NDJSON are supported blob/file formats. Blobs/files may be also be gzip compressed. As for authentication types, we currently have support for shared access keys and connection strings.

A sample configuration with detailed explanation for each field is given below :-

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    max_workers: 3
    poll: true
    poll_interval: 10s
  - name: container_2
    max_workers: 3
    poll: true
    poll_interval: 10s

Explanation : This configuration given above describes a basic blob storage config having two containers named container_1 and container_2. Each of these containers have their own attributes such as name, max_workers, poll and poll_interval. These attributes have detailed explanations given below. For now lets try to understand how this config works.

For azure blob storage input to identify the files it needs to read and process, it will require the container names to be specified. We can have as many containers as we deem fit. We are also able to configure the attributes max_workers, poll and poll_interval at the root level, which will then be applied to all containers which do not specify any of these attributes explicitly.

If the attributes max_workers, poll and poll_interval are specified at the root level, these can still be overridden at the container level with different values, thus offering extensive flexibility and customization. Examples below show this behaviour.

On receiving this config the azure blob storage input will connect to the service and retrieve a ServiceClient using the given account_name and auth.shared_credentials.account_key, then it will spawn two main go-routines, one for each container. After this each of these routines (threads) will initialize a scheduler which will in turn use the max_workers value to initialize an in-memory worker pool (thread pool) with 3 workers available. Basically that equates to two instances of a worker pool, one per container, each having 3 workers. These workers will be responsible if performing jobs that process a file (in this case read and output the contents of a file).

The scheduler is responsible for scheduling jobs, and uses the maximum available workers in the pool, at each iteration, to decide the number of files to retrieve and process. This keeps work distribution efficient. The scheduler uses poll_interval attribute value to decide how long to wait after each iteration. Each iteration consists of processing a certain number of files, decided by the maximum available workers value.

A Sample Response :-

 {
    "@timestamp": "2022-07-25T07:00:18.544Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "_doc",
        "version": "8.4.0",
        "_id": "beatscontainer-data_3.json-worker-1"
    },
    "message": "{\n    \"id\": 3,\n    \"title\": \"Samsung Universe 9\",\n    \"description\": \"Samsung's new variant which goes beyond Galaxy to the Universe\",\n    \"price\": 1249,\n    \"discountPercentage\": 15.46,\n    \"rating\": 4.09,\n    \"stock\": 36,\n    \"brand\": \"Samsung\",\n    \"category\": \"smartphones\",\n    \"thumbnail\": \"https://dummyjson.com/image/i/products/3/thumbnail.jpg\",\n    \"images\": [\n        \"https://dummyjson.com/image/i/products/3/1.jpg\"\n    ]\n}",
    "cloud": {
        "provider": "azure"
    },
    "input": {
        "type": "azure-blob-storage"
    },
    "log": {
        "offset": 200,
        "file": {
            "path": "https://beatsblobstorage1.blob.core.windows.net/beatscontainer/data_3.json"
        }
    },
    "azure": {
        "storage": {
            "container": {
                "name": "beatscontainer"
            },
            "blob": {
                "content_type": "application/json",
                "name": "data_3.json"
            }
        }
    },
    "event": {
        "kind": "publish_data"
    }
}

As we can see from the response above, the message field contains the original stringified data.

Some of the key attributes are as follows :-

  1. message : Original stringified blob data.
  2. log.file.path : Path of the blob in azure cloud.
  3. azure.storage.blob.container.name : Name of the container from which the file has been read.
  4. azure.storage.blob.object.name : Name of the file/blob which has been read.
  5. azure.storage.blob.object.content_type : Content type of the file/blob. You can find the supported content types here.

Now let’s explore the configuration attributes a bit more elaborately.

Supported Attributes :-

account_nameedit

This attribute is required for various internal operations with respect to authentication, creating service clients and blob clients which are used internally for various processing purposes.

auth.shared_credentials.account_keyedit

This attribute contains the access key, found under the Access keys section on Azure Clound, under the respective storage account. A single storage account can contain multiple containers, and they will all use this common access key.

auth.connection_string.uriedit

This attribute contains the connection string, found under the Access keys section on Azure Clound, under the respective storage account. A single storage account can contain multiple containers, and they will all use this common connection string.

We require only either of auth.shared_credentials.account_key or auth.connection_string.uri to be specified for authentication purposes. If both attributes are specified, then the one that occurs first in the configuration will be used.

storage_urledit

Use this attribute to specify a custom storage URL if required. By default it points to azure cloud storage. Only use this if there is a specific need to connect to a different environment where blob storage is available.

URL format : {{protocol}}://{{account_name}}.{{storage_uri}}. This attribute resides at the root level of the config and not inside any container block.

containersedit

This attribute contains the details about a specific container like name, max_workers, poll and poll_interval. The attribute name is specific to a container as it describes the container name, while the fields max_workers, poll and poll_interval can exist both at the container level and the root level. This attribute is internally represented as an array, so we can add as many containers as we require.

nameedit

This is a specific subfield of a container. It specifies the container name.

max_workersedit

This attribute defines the maximum number of workers (go routines / lightweight threads) are allocated in the worker pool (thread pool) for processing jobs which read contents of file. More number of workers equals a greater amount of concurrency achieved. There is an upper cap of 5000 workers per container that can be defined due to internal sdk constraints. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

polledit

This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is false, so it will not keep polling if not explicitly specified. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

poll_intervaledit

This attribute defines the maximum amount of time after which the internal scheduler will make the polling call for the next set of blobs/files. It can be defined in the following formats : {{x}}s, {{x}}m, {{x}}h, here s = seconds, m = minutes and h = hours. The value {{x}} can be anything we wish. Example : 10s would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to 300 seconds. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

file_selectorsedit

If the Azure blob storage container will have blobs that correspond to files that Filebeat shouldn’t process, file_selectors can be used to limit the files that are downloaded. This is a list of selectors which are based on a regex pattern. The regex should match the blob name or should be a part of the blob name (ideally a prefix). The regex syntax is the same as used in the Go programming language. Files that don’t match any configured regex won’t be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    file_selectors:
    - regex: '/Monitoring/'
    - regex: 'docs/'
    - regex: '/Security-Logs/'

expand_event_list_from_fieldedit

If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for expand_event_list_from_field can be specified. This setting will be able to split the messages under the group value into separate events. For example, if you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option expand_event_list_from_field can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

{
    "Records": [
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:51:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
        },
        {
            "eventVersion": "1.07",
            "eventTime": "2019-11-14T00:52:00Z",
            "region": "us-east-1",
            "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
        }
    ]
}
filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    expand_event_list_from_field: Records

This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob.

timestamp_epochedit

This attribute can be used to filter out files/blobs which have a timestamp older than the specified value. The value of this attribute should be in unix epoch (seconds) format. The timestamp value is compared with the LastModified Timestamp obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  containers:
  - name: container_1
    timestamp_epoch: 1627233600

The sample configs below will explain the container level overriding of attributes a bit further :-

CASE - 1 :

Here container_1 is using root level attributes while container_2 overrides the values :

filebeat.inputs:
- type: azure-blob-storage
  id: my-azureblobstorage-id
  enabled: true
  account_name: some_account
  auth.shared_credentials.account_key: some_key
  max_workers: 10
  poll: true
  poll_interval: 15s
  containers:
  - name: container_1
  - name: container_2
    max_workers: 3
    poll: true
    poll_interval: 10s

Explanation : In this configuration container_1 has no sub attributes in max_workers, poll and poll_interval defined. It inherits the values for these fileds from the root level, which is max_workers = 10, poll = true and poll_interval = 15 seconds. However container_2 has these fields defined and it will use those values instead of using the root values.

CASE - 2 :

Here both container_1 and container_2 overrides the root values :

filebeat.inputs:
  - type: azure-blob-storage
    id: my-azureblobstorage-id
    enabled: true
    account_name: some_account
    auth.shared_credentials.account_key: some_key
    max_workers: 10
    poll: true
    poll_interval: 15s
    containers:
    - name: container_1
      max_workers: 5
      poll: true
      poll_interval: 10s
    - name: container_2
      max_workers: 5
      poll: true
      poll_interval: 10s

Explanation : In this configuration even though we have specified max_workers = 10, poll = true and poll_interval = 15s at the root level, both the containers will override these values with their own respective values which are defined as part of their sub attibutes.

Any feedback is welcome which will help us further optimize this input. Please feel free to open a github issue for any bugs or feature requests.