This article will show you how to keep an Elasticsearch index up-to-date with an api that has ever-changing data. I will show you how to keep it updated at a nightly interval, but it can easily be adapted to update at any frequent or infrequent interval. Searching through data from a regular api with an http endpoint can be slow. This setup will increase your search speeds dramatically while making sure your data is fresh and up-to-date with any data that changes in your api.
We will be using AWS Lambda with a Node runtime as well as AWS EventBridge to set up a cronjob. For http requests, I like Axios. However, feel free to use an http client you prefer. Lastly, we will be using CloudWatch to check console.logs and verify our Lambda script is working.
This tutorial requires basic knowledge of AWS and JavaScript, and an existing api to pull in the data that you want indexed. For the demo, I will be using https://reqres.in/. You are expected to know how to initialize AWS services and adapt my JavaScript code to your use case. However, I will provide links to tutorials for further detail in each section.
Create a new Elasticsearch domain and index if you do not currently have this set up [1]. Wait for the elasticsearch domain to finish initializing (requires refreshing the page). Find your 'endpoint' in the domain's dashboard under the 'overview' tab. Jot this down somewhere for later use.
Create a new lambda function [2] with a name relevant to your use case, and make sure the runtime is Node.js. The current version as of writing this article is Node.js 14.x. If you already have a role with basic lambda and cloudwatch permissions, use that. Otherwise, select 'create a new role with basic Lambda permissions'. Then open your newly created lambda and select 'Configuration' and then 'Permissions'. Under Execution Role, click on the role name to open it in a new tab. Click 'Attach Policies' and search for 'cloudwatch'. Check off 'CloudWatchFullAccess' and then 'Attach Policy'. Go back to your lambda and click 'Code' to begin working on the code.
If you have an HTTP request lambda layer and and an elasticsearch layer, add those. If you don't or don't know what that is, create an empty project folder on your desktop. Add an index.js file to the folder and copy/paste the existing code in your lambda from your AWS console. Cd to this project folder in your terminal and run the following two commands.
1npm init 2npm i elasticsearch axios
Delete the package-lock.json and package.json files. Highlight the contents of the folder (index.js and node_modules folder), right click and select compress. Back in your AWS terminal for your lambda, select 'Upload from' and then '.zip file' and upload the zip file we just created. If there are any residual files/folders other than index.js and node_modules folder, go ahead and delete those from the AWS console.
You should now have initialized everything needed to complete this tutorial.
make a few new lines at the top of your file and import the following, using the elasticsearch domain from the previous section.
1const AWS = require("aws-sdk"); 2const axios = require("axios"); 3const elasticsearch = require("elasticsearch"); 4 5const client = new elasticsearch.Client({ 6 host: "<YourElasticsearchDomain>", 7});
delete the contents within exports.handler to start fresh, add context to the function's props, and make it an async function. Add a try/catch block for proper error handling. In your catch block, console.log the error prop and add 'context.fail(error)'. Your function should now look like this:
1const AWS = require("aws-sdk"); 2const axios = require("axios"); 3const elasticsearch = require("elasticsearch"); 4 5const client = new elasticsearch.Client({ 6 host: "<YourElasticsearchDomain>", 7}); 8 9exports.handler = async (event, context) => { 10 try { 11 } catch (error) { 12 console.log("get clubs error: ", error); 13 context.fail(error); 14 } 15};
Above your try/catch block, initialize two helper functions called 'insertElasticsearch' and 'getData'. Also, initialize two variables 'currentItemCount' and 'currentPage'. In the getData function, we are going to pull in our data from our api, and in the insertElasticsearch function we will be using the 'bulk' operation from the elasticsearch module to index our data. Then call the getData function from within the try block using the await keyword. The lambda function should now look like this:
1const AWS = require("aws-sdk"); 2const axios = require("axios"); 3const elasticsearch = require("elasticsearch"); 4 5const client = new elasticsearch.Client({ 6host: "<YourElasticSearchDomain>", 7}); 8 9exports.handler = async (event, context) => { 10// console.log("event: ", event); 11let currentItemCount = 0; 12let currentPage = 1; 13 14const insertElasticSearch = async (list) => { 15 const bulkRequestList = []; 16 17 list.forEach((el) => { 18 bulkRequestList.push({ 19 index: { 20 _index: "testData", 21 _type: "node", 22 _id: el.id, 23 }, 24 }); 25 26 bulkRequestList.push(el); 27 }); 28 29 const bulkInsertResult = await client.bulk({ 30 index: "testData", 31 body: bulkRequestList, 32 }); 33 // console.log("bulkInsertResult: ", bulkInsertResult); 34}; 35 36const getData = async (page) => { 37 // console.log("page: ", page); 38 const url = `https://reqres.in/api/users?page=${page}`; 39 // console.log("url: ", url); 40 41 const getDataResult = await axios.get(url); 42 // console.log("getDataResult: ", getDataResult); 43 44 if ( 45 getDataResult.data && 46 getDataResult.data.items && 47 getDataResult.data.items.length 48 ) { 49 const totalResults = 24; 50 const userData = getDataResult.data.items; 51 52 if (totalResults > currentItemCount) { 53 currentItemCount += 12; 54 currentPage += 1; 55 } else { 56 currentItemCount += totalResults - currentItemCount; 57 } 58 59 // console.log("currentItemCount: ", currentItemCount); 60 // console.log("getDataResult.data: ", getDataResult.data); 61 // console.log("userData: ", userData); 62 // console.log("userData[0]: ", userData[0]); 63 64 await insertElasticSearch(userData); 65 66 if (currentItemCount < totalResults) { 67 // console.log("fetching next batch of clubs"); 68 69 await getData(currentPage); 70 } else { 71 console.log("succeeding - fetched data for total amount of clubs"); 72 context.succeed("fetched data for " + currentItemCount + " clubs"); 73 } 74 } 75}; 76 77try { 78 await getData(1); 79} catch (error) { 80 console.log("get clubs error: ", error); 81 context.fail(error); 82} 83};
Now that we have our helper functions in place, it is time to check if our index exists, delete it if it does, and recreate a new index before repopulating the data. To do this we will use the 'indices' method from the elasticsearch module. After adding this logic within the try block but above the getData invocation, your lambda function should look likel this:
1const AWS = require("aws-sdk"); 2const axios = require("axios"); 3const elasticsearch = require("elasticsearch"); 4 5const client = new elasticsearch.Client({ 6host: "<YourElasticSearchDomain>", 7}); 8 9exports.handler = async (event, context) => { 10// console.log("event: ", event); 11let currentItemCount = 0; 12let currentPage = 1; 13 14const insertElasticSearch = async (list) => { 15 const bulkRequestList = []; 16 17 list.forEach((el) => { 18 bulkRequestList.push({ 19 index: { 20 _index: "testData", 21 _type: "node", 22 _id: el.id, 23 }, 24 }); 25 26 bulkRequestList.push(el); 27 }); 28 29 const bulkInsertResult = await client.bulk({ 30 index: "testData", 31 body: bulkRequestList, 32 }); 33 // console.log("bulkInsertResult: ", bulkInsertResult); 34}; 35 36const getData = async (page) => { 37 // console.log("page: ", page); 38 const url = `https://reqres.in/api/users?page=${page}`; 39 // console.log("url: ", url); 40 41 const getDataResult = await axios.get(url); 42 // console.log("getDataResult: ", getDataResult); 43 44 if ( 45 getDataResult.data && 46 getDataResult.data.items && 47 getDataResult.data.items.length 48 ) { 49 const totalResults = 24; 50 const userData = getDataResult.data.items; 51 52 if (totalResults > currentItemCount) { 53 currentItemCount += 12; 54 currentPage += 1; 55 } else { 56 currentItemCount += totalResults - currentItemCount; 57 } 58 59 // console.log("currentItemCount: ", currentItemCount); 60 // console.log("getDataResult.data: ", getDataResult.data); 61 // console.log("userData: ", userData); 62 // console.log("userData[0]: ", userData[0]); 63 64 await insertElasticSearch(userData); 65 66 if (currentItemCount < totalResults) { 67 // console.log("fetching next batch of clubs"); 68 69 await getData(currentPage); 70 } else { 71 console.log("succeeding - fetched data for total amount of clubs"); 72 context.succeed("fetched data for " + currentItemCount + " clubs"); 73 } 74 } 75}; 76 77try { 78 const indexExistsResult = await client.indices.exists({ 79 index: "testData", 80 }); 81 // console.log("indexExistsResult: ", indexExistsResult); 82 83 if (indexExistsResult) { 84 const deleteIndexResult = await client.indices.delete({ 85 index: "testData", 86 }); 87 // console.log("deleteIndexResult: ", deleteIndexResult); 88 } 89 90 if (!indexExistsResult) { 91 const createIndexResult = await client.indices.create({ 92 index: "testData", 93 }); 94 // console.log("createIndexResult: ", createIndexResult); 95 } 96 97 await getData(1); 98} catch (error) { 99 console.log("get clubs error: ", error); 100 context.fail(error); 101} 102};
Now we can test it out and see if it works. The dropdown beside the 'Test' button, and configure a test event with default settings from the 'hello-world' template. Uncomment any console.logs you would like to see to data for. I recommend having the console.logs commented out for production to reduce your CloudWatch bill. Click the 'Test' button, and under the "Moniter" tab, open logs in cloudwatch to see the outcome.
In your AWS console for your lambda, click 'Add trigger' and then select 'EventBridge'. Select 'Create a new rule' and give it a relevant name and description. In the 'Schedule expression', paste in the following:
1cron(0 7 * * ? *)
This cron expression works for me in my timezone to run the lambda script every night at midnight. However, you might want to double check that this expression works for you too, wherever you live.
To test this, wait for the set time to roll around and then navigate to your CloudWatch logs for your lambda function. Double check that things are running smooth, and that's it! You're all done :)
You should now have a grasp on how to repopulate an Elasticsearch index on a regular interval. Searching through data from a regular api with an http endpoint can be slow. Searching through your data should now be fast, optimized, and up-to-date with your 3rd party api. Learning to read from your Elasticsearch index has some tricky syntax to create parameters for the 'search' method [3], but with a little practice it starts to make sense and becomes easier than manipulating an http endpoint for your 3rd party api while providing a significant performance increase.
Hopefully this article gave you deeper knowledge of the tools we used - Elasticsearch, Node, Lambda, and EventBridge. I hope I gave you some ideas to get you on your way while giving you a succicnt article that doesn't completely overwhelm you. To learn more I recommend exploring the articles referenced below.
If you notice any issues with this tutorial, please email me at kevinmurphywebdev (@) gmail (.) com so that I can update the article accordingly. Happy coding!