A business generates enormous quantities of data every single day and as the business grows and changes, this multiplies. To stay ahead of the competing, a business should collect these data and analyze them. In this tutorial, I will share how I achieved data ingestion at scale using Logstash.
Nowadays every business runs on information, but there is a problem. The data is not at one place since it might have been created or stored on various systems and in different formats, which can make analyzing it very difficult. One of the product I work on consumes and analyzes huge quantities of data. For this purpose I used Elasticsearch, a NoSQL distributed database. It helps you organize, parse and analyze huge quantities of data. One of the tool provided by the elastic stack.
Logstash is a plugin-based data collection and processing engine that makes it easy to collect, process and forward data to different systems, but the important part is that it helps in normalizing different schemas, which means data is gathered from different systems and made available in a single format. Processing is organized into one or more pipelines. In each pipeline, one or more input plugins receive or collect data and then is placed on an internal queue, later it is processed by any available filter plugins and then pushed to the output plugin, in our case this is Elasticsearch.
The product uses 13 pipelines, each using the JDBC input plugin to ingest data from Redshift. The Data Integration job runs on a schedule and incrementally pushes data to our source Redshift. On the other end, we schedule our Logstash pipelines after the respective job completes on the Data Integration job.
We also divided the pipelines into 2 Logstash instances and shared the loads between them on two 8 core CPU with 32 GB memory boxes, each running Logstash process with Xmx 30gb. Unfortunately, we ran into out of memory issues and sometimes Elasticsearch node not available while sending data to the output plugin. So, we utilized the Logstash monitoring APIs like node info, node stats, and hot threads and found out that at least 3 – 4 pipelines are using high load data. High loads are around 20 million and above, low loads are anywhere below 1 million.
We tested with only one pipeline having high load data and observed that in 3 hours CPU usage was 100% and memory utilized to max resulting in the Logstash process crash.
We replaced the default small memory queue to a large persisted queue on disk to improve reliability and resiliency. For this, we made the following changes in Logstash configuration.
- queue.type: persisted
- Enable persistent queues. The default is memory.
- queue.max_bytes: 4gb
- The Total capacity of events that are allowed in the queue. The default is 0 (unlimited)
- queue.max_events: 10000
- The maximum number of events that are allowed in the queue. The default is 0 (unlimited)
- queue.checkpoint.writes: 1
- Force a checkpoint after each event is written for durability
With the above settings specified, Logstash will buffer events on disk until the size of the queue reaches 4gb or max 10000 events. This is handling back pressure, when the queue is full, Logstash puts back pressure on the inputs to stall data flowing into Logstash. It commits to disk in a mechanism called check pointing, in our case check pointing is done after each event. So, even if Logstash is terminated or if there is a hardware failure, we will not lose any data as it will be persisted to disk.
Apart from the above changes, we also modified our high load pipeline schedule. We changed the high data loads pipeline to trigger in between the Data Integration job to divide loads between multiple schedules on Logstash.
Having 2 or more Logstash instances share pipeline load with persisted queue will help in load balancing data ingestion. Setting max_bytes and max_events set to a smaller value will improve queue performance. Also, taking care of the schedule regarding the input data is a must implementation. This is how I achieved data ingestion at scale using Logstash. Let me know in the comments section how you use Logsash.