Google Cloud Storage Inputedit

Beta [Input]

Use the google cloud storage input to read content from files stored in buckets which reside on your Google Cloud. The input can be configured to work with and without polling, though currently, if polling is disabled it will only perform a one time passthrough, list the file contents and end the process. Polling is generally recommented for most cases even though it can get expensive with dealing with a very large number of files.

To mitigate errors and ensure a stable processing environment, this input employs the following features :

  1. When processing google cloud buckets, if suddenly there is any outage, the process will be able to resume post the last file it processed and was successfully able to save the state for.
  2. If any errors occur for certain files, they will be logged appropriately, but the rest of the files will continue to be processed normally.
  3. If any major error occurs which stops the main thread, the logs will be appropriately generated, describing said error.

Currently only JSON and NDJSON are supported object/file formats. Objects/files may be also be gzip compressed. "JSON credential keys" and "credential files" are supported authentication types. If an array is present as the root object for an object/file, it is automatically split into individual objects and processed. If a download for a file/object fails or gets interrupted, the download is retried for 2 times. This is currently not user configurable.

A sample configuration with detailed explanation for each field is given below :-

filebeat.inputs:
- type: gcs
  id: my-gcs-id
  enabled: true
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  parse_json: true
  buckets:
  - name: gcs-test-new
    max_workers: 3
    poll: true
    poll_interval: 15s
    bucket_timeout: 60s
  - name: gcs-test-old
    max_workers: 3
    poll: true
    poll_interval: 10s
    bucket_timeout: 30s

Explanation : This configuration given above describes a basic gcs config having two buckets named gcs-test-new and gcs-test-old. Each of these buckets have their own attributes such as name, max_workers, poll, poll_interval and bucket_timeout. These attributes have detailed explanations given below. For now lets try to understand how this config works.

For google cloud storage input to identify the files it needs to read and process, it will require the bucket names to be specified. We can have as many buckets as we deem fit. We are also able to configure the attributes max_workers, poll, poll_interval and bucket_timeout at the root level, which will then be applied to all buckets which do not specify any of these attributes explicitly.

If the attributes max_workers, poll, poll_interval and bucket_timeout are specified at the root level, these can still be overridden at the bucket level with different values, thus offering extensive flexibility and customization. Examples below show this behaviour.

On receiving this config the google cloud storage input will connect to the service and retrieve a Storage Client using the given bucket_name and auth.credentials_file, then it will spawn two main go-routines, one for each bucket. After this each of these routines (threads) will initialize a scheduler which will in turn use the max_workers value to initialize an in-memory worker pool (thread pool) with 3 workers available. Basically that equates to two instances of a worker pool, one per bucket, each having 3 workers. These workers will be responsible for performing jobs that process a file (in this case read and output the contents of a file).

The scheduler is responsible for scheduling jobs, and uses the maximum available workers in the pool, at each iteration, to decide the number of files to retrieve and process. This keeps work distribution efficient. The scheduler uses poll_interval attribute value to decide how long to wait after each iteration. The bucket_timeout value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the maximum available workers value.

A Sample Response :-

{
  "@timestamp": "2022-09-01T13:54:24.588Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.5.0",
    "_id": "gcs-test-new-data_3.json-worker-1"
  },
  "log": {
    "offset": 200,
    "file": {
      "path": "gs://gcs-test-new/data_3.json"
    }
  },
  "input": {
    "type": "gcs"
  },
  "message": "{\n    \"id\": 1,\n    \"title\": \"iPhone 9\",\n    \"description\": \"An apple mobile which is nothing like apple\",\n    \"price\": 549,\n    \"discountPercentage\": 12.96,\n    \"rating\": 4.69,\n    \"stock\": 94,\n    \"brand\": \"Apple\",\n    \"category\": \"smartphones\",\n    \"thumbnail\": \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\",\n    \"images\": [\n        \"https://dummyjson.com/image/i/products/1/1.jpg\",\n        \"https://dummyjson.com/image/i/products/1/2.jpg\",\n        \"https://dummyjson.com/image/i/products/1/3.jpg\",\n        \"https://dummyjson.com/image/i/products/1/4.jpg\",\n        \"https://dummyjson.com/image/i/products/1/thumbnail.jpg\"\n    ]\n}\n",
  "cloud": {
    "provider": "goole cloud"
  },
  "gcs": {
    "storage": {
      "bucket": {
        "name": "gcs-test-new"
      },
      "object": {
        "name": "data_3.json",
        "content_type": "application/json",
        "json_data": [
          {
            "id": 1,
            "discountPercentage": 12.96,
            "rating": 4.69,
            "brand": "Apple",
            "price": 549,
            "category": "smartphones",
            "thumbnail": "https://dummyjson.com/image/i/products/1/thumbnail.jpg",
            "description": "An apple mobile which is nothing like apple",
            "title": "iPhone 9",
            "stock": 94,
            "images": [
              "https://dummyjson.com/image/i/products/1/1.jpg",
              "https://dummyjson.com/image/i/products/1/2.jpg",
              "https://dummyjson.com/image/i/products/1/3.jpg",
              "https://dummyjson.com/image/i/products/1/4.jpg",
              "https://dummyjson.com/image/i/products/1/thumbnail.jpg"
            ]
          }
        ]
      }
    }
  },
  "event": {
    "kind": "publish_data"
  }
}

As we can see from the response above, the message field contains the original stringified data while the gcs.storage.object.data contains the objectified data.

Some of the key attributes are as follows :-

  1. message : Original stringified object data.
  2. log.file.path : Path of the object in google cloud.
  3. gcs.storage.bucket.name : Name of the bucket from which the file has been read.
  4. gcs.storage.object.name : Name of the file/object which has been read.
  5. gcs.storage.object.content_type : Content type of the file/object. You can find the supported content types here .
  6. gcs.storage.object.json_data : Objectified json file data, representing the contents of the file.

Now let’s explore the configuration attributes a bit more elaborately.

Supported Attributes :-

project_idedit

This attribute is required for various internal operations with respect to authentication, creating storage clients and logging which are used internally for various processing purposes.

auth.credentials_json.account_keyedit

This attribute contains the json service account credentials string, which can be generated from the google cloud console, ref: https://cloud.google.com/iam/docs/creating-managing-service-account-keys, under the respective storage account. A single storage account can contain multiple buckets, and they will all use this common service account access key.

auth.credentials_file.pathedit

This attribute contains the service account credentials file, which can be generated from the google cloud console, ref: https://cloud.google.com/iam/docs/creating-managing-service-account-keys, under the respective storage account. A single storage account can contain multiple buckets, and they will all use this common service account credentials file.

We require only either of auth.credentials_json.account_key or auth.credentials_file.path to be specified for authentication purposes. If both attributes are specified, then the one that occurs first in the configuration will be used.

bucketsedit

This attribute contains the details about a specific bucket like name, max_workers, poll, poll_interval and bucket_timeout. The attribute name is specific to a bucket as it describes the bucket name, while the fields max_workers, poll, poll_interval and bucket_timeout can exist both at the bucket level and the root level. This attribute is internally represented as an array, so we can add as many buckets as we require.

nameedit

This is a specific subfield of a bucket. It specifies the bucket name.

bucket_timeoutedit

This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file). It can be defined in the following formats : {{x}}s, {{x}}m, {{x}}h, here s = seconds, m = minutes and h = hours. The value {{x}} can be anything we wish. If no value is specified for this, by default its initialized to 50 seconds. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified.

max_workersedit

This attribute defines the maximum number of workers (go routines / lightweight threads) are allocated in the worker pool (thread pool) for processing jobs which read contents of file. More number of workers equals a greater amount of concurrency achieved. There is an upper cap of 5000 workers per bucket that can be defined due to internal sdk constraints. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified.

polledit

This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is false, so it will not keep polling if not explicitly specified. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified.

poll_intervaledit

This attribute defines the maximum amount of time after which the internal scheduler will make the polling call for the next set of objects/files. It can be defined in the following formats : {{x}}s, {{x}}m, {{x}}h, here s = seconds, m = minutes and h = hours. The value {{x}} can be anything we wish. Example : 10s would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to 300 seconds. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified.

parse_jsonedit

This attribute informs the publisher whether to parse & objectify json data or not. By default this is set to false, since it can get expensive dealing with highly nested json data. If this is set to false the gcs.storage.object.json_data field in the response will have an empty array. This attribute is only applicable for json objects and has no effect on other types of objects. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified.

The sample configs below will explain the bucket level overriding of attributes a bit further :-

CASE - 1 :

Here bucket_1 is using root level attributes while bucket_2 overrides the values :

filebeat.inputs:
- type: gcs
  id: my-gcs-id
  enabled: true
  project_id: my_project_id
  auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
  max_workers: 10
  poll: true
  poll_interval: 15s
  buckets:
  - name: bucket_1
  - name: bucket_2
    max_workers: 3
    poll: true
    poll_interval: 10s

Explanation : In this configuration bucket_1 has no sub attributes in max_workers, poll and poll_interval defined. It inherits the values for these fileds from the root level, which is max_workers = 10, poll = true and poll_interval = 15 seconds. However bucket_2 has these fields defined and it will use those values instead of using the root values.

CASE - 2 :

Here both bucket_1 and bucket_2 overrides the root values :

filebeat.inputs:
  - type: gcs
    id: my-gcs-id
    enabled: true
    project_id: my_project_id
    auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
    max_workers: 10
    poll: true
    poll_interval: 15s
    buckets:
    - name: bucket_1
      max_workers: 5
      poll: true
      poll_interval: 10s
    - name: bucket_2
      max_workers: 5
      poll: true
      poll_interval: 10s

Explanation : In this configuration even though we have specified max_workers = 10, poll = true and poll_interval = 15s at the root level, both the buckets will override these values with their own respective values which are defined as part of their sub attibutes.

Since this is an experimental (beta) input, any feedback is welcome, which will help us optimise and make it better going forward.