Introduction
Everything is a search problem. I heard someone say this during my first week at Elastic and the phrase has since taken up permanent residence in my brain. This blog is not intended as an analysis of pertinent statements made by my fantastic colleagues, but I do first want to take a moment to dissect this statement.
Since its inception, Elasticsearch has remained at the forefront of technology - breaking the mould in the software arena and powering the tech backbones of household name companies, the world over. We tend to categorise Elastic’s offering into several ‘OTB’ solutions - security, observability etc., but stripping these back, the problems we are solving are fundamentally search-oriented. It’s about being able to ask questions of your data and return meaningful and relevant results, regardless of the banner, and it’s this that makes Elastic such a powerful technology.
As a bit of an ode to search and a showcasing of some of Elastic’s capabilities, this blog will take you through the end-to-end development of a search application, using machine learning models for Named Entity Extraction (NER) and semantic searching; combining Elastic and non-Elastic components, and layering these with a simple UI to showcase the power of search.
The guide has been designed for use with Elastic Cloud, however, the same approach can be applied to locally hosted instances, with alterations to authentication methods and other cloud-specific concepts. The full git repository is available at: Kibana Search Project
Topics covered
- Logstash
- Ingest Pipelines
- ELSER
- Custom ML Models
- Streamlit
Step 1: Download BBC News dataset from Kaggle
The dataset we’re going to be using is the BBC News dataset, available as a CSV from: BBC News Dataset. This is a self-updating dataset, collecting RSS Feeds from BBC News. The dataset includes the title, description, date, url and various other attributes of published articles. We’re going to use Logstash to ingest the data, however, other methods i.e a Python Client or a standard upload are equally valid (Adding data to Elasticsearch | Elasticsearch Service Documentation | Elastic).
The raw data schema is a little problematic, so, if using Logstash, minor tweaks need to be made to the structure of the file. Unzip the downloaded file and run the script, modifying the input/output to reflect the location and name of the saved file on your local machine. This script will reorder the columns so the ‘Publish Date’ of the article comes first, to ease interpretation of the date field.
import csv
input_csv = 'path/bbc_news.csv'
output_csv = 'path/new-bbc_news.csv'
# Read in the old bbc_news CSV file
with open(input_csv, 'r') as infile:
reader = csv.DictReader(infile)
data = [row for row in reader]
# Write the file in the desired format
with open(output_csv, 'w', newline='') as outfile:
fieldnames = ['pubDate', 'title', 'guid', 'link', 'description']
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
# Write data in new format
for row in data:
writer.writerow({
'pubDate': row['pubDate'],
'title': row['title'],
'guid': row['guid'],
'link': row['link'],
'description': row['description']
})
print(f'Success. Output saved to {output_csv}')
Step 2: Use Docker to upload and start the custom NER ML Model
Next, we’re going to import the custom ML model for the NER task, for which, detailed documentation can be found here: How to deploy named entity recognition | Machine Learning in the Elastic Stack [8.11]. This tutorial uploads the custom model using docker, however, for alternative methods of installation, see: Introduction to custom machine learning models and maps | Elastic Blog.
Whilst there are many models we could utilise for NER, we’re going to use ‘distilbert-base-uncased’ from distilbert-base-uncased · Hugging Face. Optimised for lowercase text, this model proves beneficial in extracting entities with precision from unstructured data, in our case - with the assistance of Named Entity Recognition, we can extract people, places, organisations, etc. from our news articles, for use downstream.
To create a one-off API key for this task, we can call the _security endpoint, specifying our key requirements API Key Generation. Ensure you copy the encoded value generated by the request, as this value cannot be retrieved later. The API key we are creating will be used solely for this upload, therefore we can assign limited permissions and an expiry date:
POST /_security/api_key
{
"name": "ml-upload-key",
"expiration": "1d",
"role_descriptors": {
"ml_admin": {
"cluster": ["manage_ml", "monitor"],
"index": [
{
"names": ["*"],
"privileges": ["write", "read", "view_index_metadata"]
}
]
}
}
}
To import the model to your cluster, ensure Docker Desktop is up and running and run the following command in a terminal; setting the values of "CLOUD_ID" and "API_KEY" to reflect those associated with your Cloud Cluster.
docker run -it --rm docker.elastic.co/eland/eland:latest \
eland_import_hub_model \
--cloud-id $CLOUD_ID \
--es-api-key $API_KEY \
--hub-model-id "elastic/distilbert-base-uncased-finetuned-conll03-english" \
--task-type ner \
--start
If you are met with errors, ensure your Cloud ID and authentication credentials are correct, and that Docker is functioning as expected.
Step 3: Download ELSER
In this step, we’re going to download ELSER, Elastic’s ‘out-of-domain model’ to the stack. Navigate to Machine Learning -> Model Management -> Trained Models and select download on elser_model_2. For more information on installing ELSER in non-cloud environments, visit: ELSER – Elastic Learned Sparse EncodeR | Machine Learning in the Elastic Stack [8.12].
Step 4: Add mappings and pipelines in Elastic
Mappings in Elastic define the schema of our data. We need to add formal mappings for our BBC News index to ensure the data is typed as expected, and Elastic understands the structure, when we ship it to the cluster. As part of this mapping, we are excluding the tokens produced by the ELSER model to prevent mapping explosions, and defining a number of tags generated by the NER model. Navigate to Dev Tools and create the mapping:
PUT bbc-news-elser
{
"mappings": {
"_source": {
"excludes": [
"ml-elser-title.tokens",
"ml-elser-description.tokens"
]
},
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"description": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"event": {
"properties": {
"original": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"ml": {
"properties": {
"ner": {
"properties": {
"entities": {
"properties": {
"class_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"class_probability": {
"type": "float"
},
"end_pos": {
"type": "long"
},
"entity": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"start_pos": {
"type": "long"
}
}
},
"model_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"predicted_value": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
},
"ml-elser-description": {
"properties": {
"model_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tokens": {
"type": "rank_features"
}
}
},
"ml-elser-title": {
"properties": {
"model_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tokens": {
"type": "rank_features"
}
}
},
"pubDate": {
"type": "date",
"format": "EEE, dd MMM yyyy HH:mm:ss 'GMT'",
"ignore_malformed": true
},
"tags": {
"properties": {
"LOC": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"MISC": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ORG": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"PER": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"url": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
Pipelines define a series of data processing steps before indexing. Our ingest pipeline comprises field removal, model inferences for both ELSER and our custom NER model and a script to add the output values from our NER model run to a tags field.
PUT _ingest/pipeline/news-pipeline
{
"processors": [
{
"remove": {
"field": [
"host",
"message",
"log",
"@version"
],
"ignore_missing": true
}
},
{
"inference": {
"model_id": "elastic__distilbert-base-uncased-finetuned-conll03-english",
"target_field": "ml.ner",
"field_map": {
"title": "text_field"
}
}
},
{
"script": {
"lang": "painless",
"if": "return ctx['ml']['ner'].containsKey('entities')",
"source": "Map tags = new HashMap(); for (item in ctx['ml']['ner']['entities']) { if (!tags.containsKey(item.class_name)) tags[item.class_name] = new HashSet(); tags[item.class_name].add(item.entity);} ctx['tags'] = tags;"
}
},
{
"inference": {
"model_id": ".elser_model_2",
"target_field": "ml-elser-title",
"field_map": {
"title": "text_field"
},
"inference_config": {
"text_expansion": {
"results_field": "tokens"
}
}
}
},
{
"inference": {
"model_id": ".elser_model_2",
"target_field": "ml-elser-description",
"field_map": {
"description": "text_field"
},
"inference_config": {
"text_expansion": {
"results_field": "tokens"
}
}
}
}
]
}
Step 5: Use Logstash to ingest data
We now need to configure Logstash to ship the data to Elastic. Download Logstash (if not already downloaded), and follow the steps to install, documented here: Getting Started with Logstash.
Our configuration file comprises three elements: an input block, a filter block and an output block. Let’s take a second to run through the contents of each.
Input: Our input configures Logstash to read data from our CSV file located at the specified path. It starts reading from the beginning of the file, disables the sincedb feature, and assumes the file is in plain text.
input {
file {
path => "/path_to_file/new-bbc_news.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "plain"
}
}
Filter: This section applies filters to the incoming data. It uses the CSV filter to parse the CSV data, specifying the comma as the separator and defining the column names. To account for the presence of duplicate entries in the bbc news dataset, we apply a fingerprint filter to calculate a unique fingerprint based on the concatenation of the "title" and "link" fields, storing it in [@metadata][fingerprint]. The mutate filter renames the "link" field to "url" and removes the "guid" field.
filter {
csv {
separator => ","
columns => ["pubDate", "title", "guid", "link", "description"]
skip_header => true
quote_char => '"'
}
fingerprint {
source => ["title", "link"]
target => "[@metadata][fingerprint]"
}
mutate { rename => { "link" => "url" } }
}
Output: The final section configures the output destinations for the processed data. It sends the data to our Elasticsearch Cloud instance specified by the Cloud ID and credentials. The data is stored in the "bbc-news-elser" index - mapped in Section 2 - and our Ingest Pipeline named "news-pipeline" is applied. The document_id is set to the unique fingerprint generated by our fingerprint filter. Additionally, a copy of the data is printed to the console for debugging purposes using the rubydebug codec.
output {
elasticsearch {
cloud_id => "${CLOUD_ID}"
api_key => ${API_KEY}"
index => "bbc-news-elser"
pipeline => "news-pipeline"
document_id => "%{[@metadata][fingerprint]}"
}
stdout { codec => rubydebug }
}
Remember to set the CLOUD_ID and API_KEY as environment variables - or store in a key store, Logstash Keystore Guide - and ensure that the path to the CSV file is accurate. Note - you will need to create a new API key for Logstash with relevent permissions. You can either run Logstash directly from the command line using the '-f' flag to specify the configuration location, or use the pipelines file to point to the config. If opting for the pipeline approach, add the following lines to your pipelines.yml file:
- pipeline.id: bbc-news
path.config: "path-to-config"
Step 6: Verify data ingest
If all has gone smoothly, we should now be able to explore our BBC News data in the cluster.
Create a data view in Discover, or Stack Management, using the pubDate field as the ‘Timestamp field’.
Verify you can see the data:
To better understand what's going on under the hood with the NER model, we can query the data in Dev Tools, tailoring the response to return fields of interest:
GET bbc-news-elser/_search?size=1
{
"_source": ["ml.ner", "title"],
"fields": [
"ml.ner", "title"
]
}
This should return a response similar to the block below:
"_source": {
"title": "Chantelle Cameron beats Jessica McCaskill to become the undisputed light-welterweight champion",
"ml": {
"ner": {
"predicted_value": "[Chantelle Cameron](PER&Chantelle+Cameron) beats [Jessica McCaskill](PER&Jessica+McCaskill) to become the undisputed light-welterweight champion",
"entities": [
{
"entity": "chantelle cameron",
"class_name": "PER",
"class_probability": 0.9991421763363986,
"start_pos": 0,
"end_pos": 17
},
{
"entity": "jessica mccaskill",
"class_name": "PER",
"class_probability": 0.9992655100734016,
"start_pos": 24,
"end_pos": 41
}
],
"model_id": "elastic__distilbert-base-uncased-finetuned-conll03-english"
}
}
}
Breaking this snippet down, we can see our original "title" value, and the results produced by the NER model. The "predicted_value:" field shows the text with identified entities annotated. In this case, "Chantelle Cameron" and "Jessica McCaskill" have been identified as persons (PER). The "entities" object contains an objects, each representing a named entity recognised in the raw "title" field and comprises:
- "entity" - the string of the named entity as recognised in the text.
- "class_naAme" - the classification assigned to the entity i.e PER, LOC, ORG.
- "class_probability" - a decimal value representing the model's confidence in its classification of the entity. The values for both entities in the response above are close to 1, indicating high confidence.
- "start_pos" and "end_pos" - the start and end positions (zero-indexed) of the entity within the predicted_value text, which can be useful for applications requiring highlighting or further processing of specific entities in the text.
Step 7: Deploy search UI
In this final step, we introduce a Streamlit application that leverages our BBC News dataset for semantic and standard text searches.
First, follow the steps to install Streamlit, as described here: Streamlit Git Repo, or use the requirements.text file located in the git repository. Once installed, create a file called elasticapp.py and add the Python code block. As above, where we require an authentication to the cloud cluster , the “CLOUD_ID”, “API_KEY" variables need to be set before running (alternatively, user and password can be used to authenticate access to the cluster). This can be achieved by creating a dotenv file, or by exporting the variabls. For the latter approach, run the following commands:
export CLOUD_ID={{cloud_id}}
export API_KEY={{api_key}}
The user interface we’re implementing facilitates the input of both semantic and standard queries, the selection of an Elasticsearch index and subsequently initiates a search on our articles dataset. The Elasticsearch connection is established using our cloud credentials loaded from environment variables. The backend logic includes functions for fetching data based on user queries and updating the Streamlit app's display with the search results.
import streamlit as st
from elasticsearch import Elasticsearch
import os
from datetime import datetime
cloud_id = os.getenv("CLOUD_ID")
api_key = os.getenv("API_KEY")
es = Elasticsearch(
cloud_id=cloud_id,
api_key=api_key
)
def main():
st.title("Elasticsearch News App")
selected_index = st.sidebar.selectbox("Elasticsearch Index", ["bbc-news-elser"], key="selected_index")
if 'selected_tags' not in st.session_state:
st.session_state['selected_tags'] = {"LOC": set(), "PER": set(), "MISC": set()}
if 'search_results' not in st.session_state:
st.session_state['search_results'] = fetch_recent_data(selected_index, size=20)
semantic_query = st.text_input("Semantic Query:", key="semantic_query")
regular_query = st.text_input("Standard Query:", key="regular_query")
min_date, max_date = get_date_range(selected_index)
start_date = st.date_input("Start Date", min_date, key="start_date")
end_date = st.date_input("End Date", max_date, key="end_date")
if st.button("Search"):
st.session_state['search_results'] = fetch_data(selected_index, semantic_query, regular_query, start_date, end_date)
st.session_state['selected_tags'] = {tag_type: set() for tag_type in ["LOC", "PER", "MISC"]} # Reset filters on new search
for tag_type in ["LOC", "PER", "MISC"]:
current_tags = get_unique_tags(tag_type, st.session_state['search_results'])
st.session_state['selected_tags'][tag_type] = st.sidebar.multiselect(f"Filter by {tag_type}", current_tags, key=f"filter_{tag_type}")
filtered_results = filter_results_by_tags(st.session_state['search_results'], st.session_state['selected_tags'])
update_results(filtered_results)
def fetch_recent_data(index_name, size=100):
try:
query_body = {
"size": size,
"sort": [
{"pubDate": {"order": "desc"}}, # Primary sort by date
]
}
response = es.search(index=index_name, body=query_body)
return [hit['_source'] for hit in response['hits']['hits']]
except Exception as e:
st.error(f"Error fetching recent data from Elasticsearch: {e}")
return []
# Helper function to calculate the earliest and latest dates in the index
def get_date_range(index_name):
max_date_aggregation = {
"max_date": {
"max": {
"field": "pubDate"
}
}
}
min_date_aggregation = {
"min_date": {
"min": {
"field": "pubDate"
}
}
}
max_date_result = es.search(index=index_name, body={"aggs": max_date_aggregation})
min_date_result = es.search(index=index_name, body={"aggs": min_date_aggregation})
max_date_bucket = max_date_result['aggregations']['max_date']
min_date_bucket = min_date_result['aggregations']['min_date']
max_date = max_date_bucket['value_as_string']
min_date = min_date_bucket['value_as_string']
if max_date:
max_date = datetime.strptime(max_date, "%a, %d %b %Y %H:%M:%S GMT")
else:
max_date = datetime.today().date()
if min_date:
min_date = datetime.strptime(min_date, "%a, %d %b %Y %H:%M:%S GMT")
else:
min_date = datetime.today().date()
return min_date, max_date
# Updates results based on search
def update_results(results):
try:
for result_item in results:
# Display document titles as links
title_with_link = f"[{result_item['title']}]({result_item['url']})"
st.markdown(f"### {title_with_link}")
st.write(result_item['description'])
# Display timestamp with results
timestamp = result_item.get('pubDate', '')
if timestamp:
st.write(f"Published: {timestamp}")
# Adds tags for entities
tags = result_item.get('tags', {})
if tags:
for tag_type, tag_values in tags.items():
for tag_value in tag_values:
# Define colors for extracted entity tags
tag_color = {
"LOC": "#3498db",
"PER": "#2ecc71",
"MISC": "#e74c3c"
}.get(tag_type, "#555555")
st.markdown(
f"<span style='background-color: {tag_color}; color: white; padding: 5px; margin: 2px; border-radius: 5px;'>{tag_type}: {tag_value}</span>",
unsafe_allow_html=True)
st.write("---")
except Exception as e:
st.error(f"Error performing search in Elasticsearch: {e}")
# Fetch data from ES based on index + queries. Specify size - can be modified.
def fetch_data(index_name, semantic_query, regular_query, start_date=None, end_date=None, size=100):
try:
query_body = {
"size": size,
"query": {
"bool": {
"should": []
}
}
}
# Add semantic query if provided by the user
if semantic_query:
query_body["query"]["bool"]["should"].append(
{"bool": {
"should": {
"text_expansion": {
"ml-elser-title.tokens": {
"model_text": semantic_query,
"model_id": ".elser_model_2",
"boost": 9
}
},
"text_expansion": {
"ml-elser-description.tokens": {
"model_text": semantic_query,
"model_id": ".elser_model_2",
"boost": 9
}
}
}
}}
)
# Add regular query if provided by the user
if regular_query:
query_body["query"]["bool"]["should"].append({
"query_string": {
"query": regular_query,
"boost": 8
}
})
# Add date range if provided
if start_date or end_date:
date_range_query = {
"range": {
"pubDate": {}
}
}
if start_date:
date_range_query["range"]["pubDate"]["gte"] = start_date.strftime("%a, %d %b %Y %H:%M:%S GMT")
if end_date:
date_range_query["range"]["pubDate"]["lte"] = end_date.strftime("%a, %d %b %Y %H:%M:%S GMT")
query_body["query"]["bool"]["must"] = date_range_query
result = es.search(
index=index_name,
body=query_body
)
hits = result['hits']['hits']
data = [{'_id': hit['_id'], 'title': hit['_source'].get('title', ''),
'description': hit['_source'].get('description', ''),
'tags': hit['_source'].get('tags', {}), 'pubDate': hit['_source'].get('pubDate', ''),
'url': hit['_source'].get('url', '')} for hit in hits]
return data
except Exception as e:
st.error(f"Error fetching data from Elasticsearch: {e}")
return []
# Function to get unique tags of a specific type
def get_unique_tags(tag_type, results):
unique_tags = set()
for result_item in results:
tags = result_item.get('tags', {}).get(tag_type, [])
unique_tags.update(tags)
return sorted(unique_tags)
# Function to filter results based on selected tags
def filter_results_by_tags(results, selected_tags):
filtered_results = []
for result_item in results:
tags = result_item.get('tags', {})
add_result = True
for tag_type, selected_values in selected_tags.items():
if selected_values:
result_values = tags.get(tag_type, [])
if not any(value in selected_values for value in result_values):
add_result = False
break
if add_result:
filtered_results.append(result_item)
return filtered_results
if __name__ == "__main__":
main()
And that’s it. You should now be able to access your working application in your browser:
streamlit run elasticapp.py