Stephen Brown

Using NLP and Pattern Matching to Detect, Assess, and Redact PII in Logs -  Part 1

How to detect and assess PII in your logs using Elasticsearch and NLP

25 min read
Using NLP and Pattern Matching to Detect, Assess, and Redact PII in Logs - Part 1

Introduction:

The prevalence of high-entropy logs in distributed systems has significantly raised the risk of PII (Personally Identifiable Information) seeping into our logs, which can result in security and compliance issues. This 2-part blog delves into the crucial task of identifying and managing this issue using the Elastic Stack. We will explore using NLP (Natural Language Processing) and Pattern matching to detect, assess, and, where feasible, redact PII from logs that are being ingested into Elasticsearch.

In Part 1 of this blog, we will cover the following:

  • Review the techniques and tools we have available to manage PII in our logs
  • Understand the roles of NLP / NER in PII detection
  • Build a composable processing pipeline to detect and assess PII
  • Sample logs and run them through the NER Model
  • Assess the results of the NER Model

In Part 2 of this blog, we will cover the following:

  • Redact PII using NER and the redact processor
  • Apply field-level security to control access to the un-redacted data
  • Enhance the dashboards and alerts
  • Production considerations and scaling
  • How to run these processes on incoming or historical data

Here is the overall flow we will construct over the 2 blogs:

All code for this exercise can be found at: https://github.com/bvader/elastic-pii.

Tools and Techniques

There are four general capabilities that we will use for this exercise.

  • Named Entity Recognition Detection (NER)
  • Pattern Matching Detection
  • Log Sampling
  • Ingest Pipelines as Composable Processing

Named Entity Recognition (NER) Detection

NER is a sub-task of Natural Language Processing (NLP) that involves identifying and categorizing named entities in unstructured text into predefined categories such as:

  • Person: Names of individuals, including celebrities, politicians, and historical figures.
  • Organization: Names of companies, institutions, and organizations.
  • Location: Geographic locations, including cities, countries, and landmarks.
  • Event: Names of events, including conferences, meetings, and festivals.

For our use PII case, we will choose the base BERT NER model bert-base-NER that can be downloaded from Hugging Face and loaded into Elasticsearch as a trained model.

Important Note: NER / NLP Models are CPU-intensive and expensive to run at scale; thus, we will want to employ a sampling technique to understand the risk in our logs without sending the full logs volume through the NER Model. We will discuss the performance and scaling of the NER model in part 2 of the blog.

Pattern Matching Detection

In addition to using an NER, regex pattern matching is a powerful tool for detecting and redacting PII based on common patterns. The Elasticsearch redact processor is built for this use case.

Log Sampling

Considering the performance implications of NER and the fact that we may be ingesting a large volume of logs into Elasticsearch, it makes sense to sample our incoming logs. We will build a simple log sampler to accomplish this.

Ingest Pipelines as Composable Processing

We will create several pipelines, each focusing on a specific capability and a main ingest pipeline to orchestrate the overall process.

Building the Processing Flow

Logs Sampling + Composable Ingest Pipelines

The first thing we will do is set up a sampler to sample our logs. This ingest pipeline simply takes a sampling rate between 0 (no log) and 10000 (all logs), which allows as low as ~0.01% sampling rate and marks the sampled logs with

sample.sampled: true
. Further processing on the logs will be driven by the value of
sample.sampled
. The
sample.sample_rate
can be set here or "passed in" from the orchestration pipeline.

The command should be run from the Kibana -> Dev Tools

The code can be found here for the following three sections of code.

logs-sampler pipeline code - click to open/close
# logs-sampler pipeline - part 1
DELETE _ingest/pipeline/logs-sampler
PUT _ingest/pipeline/logs-sampler
{
  "processors": [
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "if": "ctx.sample.sample_rate == null",
        "field": "sample.sample_rate",
        "value": 10000
      }
    },
    {
      "set": {
        "description": "Determine if keeping unsampled docs",
        "if": "ctx.sample.keep_unsampled == null",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "set": {
        "field": "sample.sampled",
        "value": false
      }
    },
    {
      "script": {
        "source": """ Random r = new Random();
        ctx.sample.random = r.nextInt(params.max); """,
        "params": {
          "max": 10000
        }
      }
    },
    {
      "set": {
        "if": "ctx.sample.random <= ctx.sample.sample_rate",
        "field": "sample.sampled",
        "value": true
      }
    },
    {
      "drop": {
         "description": "Drop unsampled document if applicable",
        "if": "ctx.sample.keep_unsampled == false && ctx.sample.sampled == false"
      }
    }
  ]
}

Now, let's test the logs sampler. We will build the first part of the composable pipeline. We will be sending logs to the logs-generic-default data stream. With that in mind, we will create the

logs@custom
ingest pipeline that will be automatically called using the logs data stream framework for customization. We will add one additional level of abstraction so that you can apply this PII processing to other data streams.

Next, we will create the

process-pii
pipeline. This is the core processing pipeline where we will orchestrate PII processing component pipelines. In this first step, we will simply apply the sampling logic. Note that we are setting the sampling rate to 100, which is equivalent to 10% of the logs.

process-pii pipeline code - click to open/close
# Process PII pipeline - part 1
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Set true if enabling sampling, otherwise false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "logs-sampler",
        "ignore_failure": true
      }
    }
  ]
}

Finally, we create the logs

logs@custom
, which will simply call our
process-pii
pipeline based on the correct
data_stream.dataset

logs@custom pipeline code - click to open/close
# logs@custom pipeline - part 1
DELETE _ingest/pipeline/logs@custom
PUT _ingest/pipeline/logs@custom
{
  "processors": [
    {
      "set": {
        "field": "pipelinetoplevel",
        "value": "logs@custom"
      }
    },
        {
      "set": {
        "field": "pipelinetoplevelinfo",
        "value": "{{{data_stream.dataset}}}"
      }
    },
    {
      "pipeline": {
        "description" : "Call the process_pii pipeline on the correct dataset",
        "if": "ctx?.data_stream?.dataset == 'pii'", 
        "name": "process-pii"
      }
    }
  ]
}

Now, let's test to see the sampling at work.

Load the data as described here Data Loading Appendix. Let's use the sample data first, and we will talk about how to test with your incoming or historical logs later at the end of this blog.

If you look at Observability -> Logs -> Logs Explorer with KQL filter

data_stream.dataset : pii
and Breakdown by sample.sampled, you should see the breakdown to be approximately 10%

At this point we have a composable ingest pipeline that is "sampling" logs. As a bonus, you can use this logs sampler for any other use cases you have as well.

Loading, Configuration, and Execution of the NER Pipeline

Loading the NER Model

You will need a Machine Learning node to run the NER model on. In this exercise, we are using Elastic Cloud Hosted Deployment on AWS with the CPU Optimized (ARM) architecture. The NER inference will run on a Machine Learning AWS c5d node. There will be GPU options in the future, but today, we will stick with CPU architecture.

This exercise will use a single c5d with 8 GB RAM with 4.2 vCPU up to 8.4 vCPU

Please refer to the official documentation on how to import an NLP-trained model into Elasticsearch for complete instructions on uploading, configuring, and deploying the model.

The quickest way to get the model is using the Eland Docker method.

The following command will load the model into Elasticsearch but will not start it. We will do that in the next step.

docker run -it --rm --network host docker.elastic.co/eland/eland \
  eland_import_hub_model \
  --url https://mydeployment.es.us-west-1.aws.found.io:443/ \
  -u elastic -p password \
  --hub-model-id dslim/bert-base-NER --task-type ner

Deploy and Start the NER Model

In general, to improve ingest performance, increase throughput by adding more allocations to the deployment. For improved search speed, increase the number of threads per allocation.

To scale ingest, we will focus on scaling the allocations for the deployed model. More information on this topic is available here. The number of allocations must be less than the available allocated processors (cores, not vCPUs) per node.

To deploy and start the NER Model. We will do this using the Start trained model deployment API

We will configure the following:

  • 4 Allocations to allow for more parallel ingestion
  • 1 Thread per Allocation
  • 0 Byes Cache, as we expect a low cache hit rate
  • 8192 Queue
# Start the model with 4 Allocators x 1 Thread, no cache, and 8192 queue
POST _ml/trained_models/dslim__bert-base-ner/deployment/_start?cache_size=0b&number_of_allocations=4&threads_per_allocation=1&queue_capacity=8192

You should get a response that looks something like this.

{
  "assignment": {
    "task_parameters": {
      "model_id": "dslim__bert-base-ner",
      "deployment_id": "dslim__bert-base-ner",
      "model_bytes": 430974836,
      "threads_per_allocation": 1,
      "number_of_allocations": 4,
      "queue_capacity": 8192,
      "cache_size": "0",
      "priority": "normal",
      "per_deployment_memory_bytes": 430914596,
      "per_allocation_memory_bytes": 629366952
    },
...
    "assignment_state": "started",
    "start_time": "2024-09-23T21:39:18.476066615Z",
    "max_assigned_allocations": 4
  }
}

The NER model has been deployed and started and is ready to be used.

The following ingest pipeline implements the NER model via the inference processor.

There is a significant amount of code here, but only two items of interest now exist. The rest of the code is conditional logic to drive some additional specific behavior that we will look closer at in the future.

  1. The inference processor calls the NER model by ID, which we loaded previously, and passes the text to be analyzed, which, in this case, is the message field, which is the text_field we want to pass to the NER model to analyze for PII.

  2. The script processor loops through the message field and uses the data generated by the NER model to replace the identified PII with redacted placeholders. This looks more complex than it really is, as it simply loops through the array of ML predictions and replaces them in the message string with constants, and stores the results in a new field

    redact.message
    . We will look at this a little closer in the following steps.

The code can be found here for the following three sections of code.

The NER PII Pipeline

logs-ner-pii-processor pipeline code - click to open/close
# NER Pipeline
DELETE _ingest/pipeline/logs-ner-pii-processor
PUT _ingest/pipeline/logs-ner-pii-processor
{
  "processors": [
    {
      "set": {
        "description": "Set to true to actually redact, false will run processors but leave original",
        "field": "redact.enable",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set to true to keep ml results for debugging",
        "field": "redact.ner.keep_result",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
        "field": "redact.ner.skip_entity",
        "value": "NONE"
      }
    },
    {
      "set": {
        "description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
        "field": "redact.ner.minimum_score",
        "value": 0.0
      }
    },
    {
      "set": {
        "if" : "ctx.redact.message == null",
        "field": "redact.message",
        "copy_from": "message"
      }
    },
    {
      "set": {
        "field": "redact.successful",
        "value": true
      }
    },
    {
      "inference": {
        "model_id": "dslim__bert-base-ner",
        "field_map": {
          "message": "text_field"
        },
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "failure",
              "value": "REDACT_NER_FAILED"
            }
          },
          {
            "set": {
              "field": "redact.successful",
              "value": false
            }
          }
        ]
      }
    },
    {
      "script": {
        "if": "ctx.failure_ner != 'REDACT_NER_FAILED'",
        "lang": "painless",
        "source": """String msg = ctx['message'];
          for (item in ctx['ml']['inference']['entities']) {
          	if ((item['class_name'] != ctx.redact.ner.skip_entity) && 
          	  (item['class_probability'] >= ctx.redact.ner.minimum_score)) {  
          		  msg = msg.replace(item['entity'], '<' + 
          		  'REDACTNER-'+ item['class_name'] + '>')
          	}
          }
          ctx.redact.message = msg""",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "failure",
              "value": "REDACT_REPLACEMENT_SCRIPT_FAILED",
              "override": false
            }
          },
          {
            "set": {
              "field": "redact.successful",
              "value": false
            }
          }
        ]
      }
    },
    {
      "remove": {
        "if": "ctx.redact.ner.keep_result != true",
        "field": [
          "ml"
        ],
        "ignore_missing": true,
        "ignore_failure": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "failure",
        "value": "GENERAL_FAILURE",
        "override": false
      }
    }
  ]
}

The updated PII Processor Pipeline, which now calls the NER Pipeline

process-pii pipeline code - click to open/close
# Updated Process PII pipeline that now call the NER pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Set true if enabling sampling, otherwise false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "logs-sampler",
        "ignore_failure": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
        "name": "logs-ner-pii-processor"
      }
    }
  ]
}

Now reload the data as described here in Reloading the logs

Results

Let's take a look at the results with the NER processing in place. In the Logs Explorer with KQL query bar, execute the following query

data_stream.dataset : pii and ml.inference.entities.class_name : ("PER" and "LOC" and "ORG" )

Logs Explorer should look something like this, open the top message to see the details.

NER Model Results

Lets take a closer look at what these fields mean.

Field:

ml.inference.entities.class_name

Sample Value:
[PER, PER, LOC, ORG, ORG]

Description: An array of the named entity classes that the NER model has identified.

Field:

ml.inference.entities.class_probability

Sample Value:
[0.999, 0.972, 0.896, 0.506, 0.595]

Description: The class_probability is a value between 0 and 1, which indicates how likely it is that a given data point belongs to a certain class. The higher the number, the higher the probability that the data point belongs to the named class. This is important as in the next blog we can decide a threshold that we will want to use to alert and redact on.' You can see in this example it identified a
LOC
as an
ORG
, we can filter this out / find them by setting a threshold.

Field:

ml.inference.entities.entity

Sample Value:
[Paul Buck, Steven Glens, South Amyborough, ME, Costco]

Description: The array of entities identified that align positionally with the
class_name
and
class_probability
.

Field:

ml.inference.predicted_value

Sample Value:
[2024-09-23T14:32:14.608207-07:00Z] log.level=INFO: Payment successful for order #4594 (user: [Paul Buck](PER&Paul+Buck), david59@burgess.net). Phone: 726-632-0527x520, Address: 3713 [Steven Glens](PER&Steven+Glens), [South Amyborough](LOC&South+Amyborough), [ME](ORG&ME) 93580, Ordered from: [Costco](ORG&Costco)

Description: The predicted value of the model.

PII Assessment Dashboard

Lets take a quick look at a dashboard built to assess PII the data.

To load the dashboard, go to Kibana -> Stack Management -> Saved Objects and import the

pii-dashboard-part-1.ndjson
file that can be found here:

https://github.com/bvader/elastic-pii/elastic/pii-dashboard-part-1.ndjson

More complete instructions on Kibana Saved Objects can be found here.

After loading the dashboard, navigate to it and select the right time range and you should see something like below. It shows metrics such as sample rate, percent of logs with NER, NER Score Trends etc. We will examine the assessment and actions in part 2 of this blog.

Summary and Next Steps

In this first part of the blog, we have accomplished the following.

  • Reviewed the techniques and tools we have available for PII detection and assement
  • Reviewed NLP / NER role in PII detection and assessment
  • Built the necessary composable ingest pipelines to sample logs and run them through the NER Model
  • Reviewed the NER results and are ready to move to the second blog

In the upcoming Part 2 of this blog, we will cover the following:

  • Redact PII using NER and redact processor
  • Apply field-level security to control access to the un-redacted data
  • Enhance the dashboards and alerts
  • Production considerations and scaling
  • How to run these processes on incoming or historical data

Data Loading Appendix

Code

The data loading code can be found here:

https://github.com/bvader/elastic-pii

$ git clone https://github.com/bvader/elastic-pii.git

Creating and Loading the Sample Data Set

$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker

Run the log generator

$ python generate_random_logs.py

If you do not changes any parameters, this will create 10000 random logs in a file named pii.log with a mix of logs that containe and do not contain PII.

Edit

load_logs.py
and set the following

# The Elastic User 
ELASTIC_USER = "elastic"

# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"

# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="

Then run the following command.

$ python load_logs.py

Reloading the logs

Note To reload the logs, you can simply re-run the above command. You can run the command multiple time during this exercise and the logs will be reloaded (actually loaded again). The new logs will not collide with previous runs as there will be a unique

run.id
for each run which is displayed at the end of the loading process.

$ python load_logs.py

Share this article