Skip to main content

Filtering a large (> 2 GB) CSV file?

Are you filtering a large CSV file, typically  > 2 GB? Let's say you have a CSV file larger than 2 GB and you want to filter only the matching rows.

First option that comes to our mind is shell script. It's simple and fast. Alright, lets do it.

Scope

  • I have CSV file, input.csv with 10.1 million rows
  • The file size is 2.1 GB
  • I need to search for any of the 200 words I have in terms.txt file 
  • I want the matching rows in output.csv, containing any of the 200 search texts
#!/bin/bash

> output.csv  # clear or create output file

# Read each search string from search.txt
while IFS= read -r search; do
  # Loop through all CSV files inside the folders
  find . -type f -name "*.csv" | while IFS= read -r csv; do
    # Search for the string in the CSV file and append matching rows
    grep -iF -- "$search" "$csv" >> output.csv
  done
done < terms.txt

 

👏Cool. It completed in 5:49 mins. Problem solved!

Wait.🤔

Im supposed to get only 100 rows but I got 103 rows.

⚠️
Got it!   It is because we are matching the search text against entire row; not restricting to any column. Remember we are dealing with CSV files.

Let's say you have a file containing orders and customers with,

  • Order Id
  • Order details 
  • Order amount
  • Customer name
  • Customer age
  • Customer phone number
  • Customer Address

 If I search for 58, script will output the following rows, though my intention is to filter out orders amounting to 58.

  • All the orders whose order id contains 58
  • All the orders with amount 58
  • All customers of age 58
  • All customers having 58 in their phone number
  • All customers having 58 in their address

Of course this is easier to de-duplicate than searching a [10] million records in a file. Despite that, can we improve it? Can we avoid manual de-duplication?

Ah, our savior, Python. How can anyone process a file without turning to Python? Python is powerful, flexible and requires only few lines of code (may be as less as our shell script).

Here we have two options:- 

  1. Using plain CSV parser in python or 
  2. Using Pandas. 
Feature `csv` module `pandas`
Library type Built-in Third-party (install via `pip install pandas`)
Ease of use Low-level, more manual High-level, very convenient
Performance Lightweight, faster for small files Optimized, faster for large files
Memory usage Lower Higher (loads full file into memory)
Search/filter support Manual loops/string matching One-liners using `.str.contains()` or `.isin()`
Column handling Manual (by index or name if using DictReader) Automatic, supports labels, index, dtype
Data manipulation Tedious Very powerful: joins, grouping, etc.
Output writing Manual (via `csv.writer`) Simple `.to_csv()`

Let's start with pandas.

import pandas as pd

df = pd.read_csv("input.csv")
search_terms = pd.read_csv("terms.txt", header=None)[0].tolist()
columns_to_search = ['col1', 'col3']  # use column names

# Create a boolean mask
mask = df[columns_to_search].apply(lambda col: col.astype(str).apply(
    lambda x: any(term in x for term in search_terms)), axis=1).any(axis=1)

df[mask].to_csv("output.csv", index=False)
It is throwing
DtypeWarning: Columns (16,24) have mixed types. Specify dtype option on import or set low_memory=False.

This means Pandas detected inconsistent types (e.g., some rows are strings, others numbers) in columns 16 and 24. It’s a warning, not an error, but it can lead to unexpected behavior. That is, when pandas reads the file, it infers the column type as it goes. However if a column initially contains number for few rows, but later string for few rows, then pandas will complain like this.

So let's explicitly specify the type as string as follows.

df = pd.read_csv("input.csv", dtype=str)

 Now I'm getting the following error.

line 8, in <module>
mask = df[columns_to_search].apply(lambda col: col.astype(str).apply(
~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
lambda x: any(term in x for term in search_terms)), axis=1).any(axis=1)

TypeError: 'in <string>' requires string as left operand, not int

That error means that one of the term values in search_terms is not a string. 

 So let's convert all the search terms to string. Here is the updated code.

import pandas as pd

# Read entire CSV as string to avoid type issues
df = pd.read_csv("input.csv", dtype=str)

# Read search terms and convert them to string (important!)
search_terms = pd.read_csv("terms.txt", header=None)[0].astype(str).tolist()

# Columns to search (use correct column names from the CSV)
columns_to_search = ['col1', 'col3']

# Apply search across the specified columns
mask = df[columns_to_search].apply(
    lambda col: col.apply(lambda x: any(str(term) in x for term in search_terms)),
    axis=1
).any(axis=1)

# Output the matching rows
df[mask].to_csv("output.csv", index=False)
⚠️
It's running...   It is running for the past 26 mins occupying more than 20GB of RAM and hasn't completed.

This is not something that you would want to run in your dev machine.

Can we use threads to make it faster?

🧠 First, Let's understand the constraints

  • CSV reading is typically I/O-bound, but filtering/searching can become CPU-bound if the file is large or has complex logic.

  • Maintaining output order means:

    • We must either split work predictably (e.g., by line number or chunks) and reorder the results afterward, or

    • Use process/thread-safe queues with a sequence index and sort later.

import pandas as pd
from concurrent.futures import ProcessPoolExecutor
import numpy as np

def process_chunk(index_chunk_tuple):
    index, chunk, search_terms, columns = index_chunk_tuple
    mask = chunk[columns].apply(lambda col: col.astype(str).apply(
        lambda x: any(term in x for term in search_terms)), axis=1).any(axis=1)
    return index, chunk[mask]

def parallel_filter(input_csv, terms_file, output_csv, columns_to_search, chunksize=10000):
    search_terms = pd.read_csv(terms_file, header=None)[0].tolist()
    reader = pd.read_csv(input_csv, chunksize=chunksize)
    
    jobs = []
    for i, chunk in enumerate(reader):
        jobs.append((i, chunk, search_terms, columns_to_search))

    with ProcessPoolExecutor() as executor:
        results = list(executor.map(process_chunk, jobs))

    # Sort results by original chunk order
    results.sort(key=lambda x: x[0])
    filtered_chunks = [res[1] for res in results]

    pd.concat(filtered_chunks).to_csv(output_csv, index=False)

# Example call:
# parallel_filter("input.csv", "terms.txt", "output.csv", ["col1", "col3"])
Finally! It took 5:29 mins and used around 5GB of RAM.

Ok, lets turn to built-in csv, though it may require more code, but with a hope that it will run faster and require less memory.

import csv

search_terms = set(line.strip() for line in open('terms.txt'))
columns_to_search = [1, 3]  # example column indices (0-based)

with open('input.csv', newline='') as infile, open('output.csv', 'w', newline='') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    for row in reader:
        if any(any(term in row[col] for term in search_terms) for col in columns_to_search):
            writer.writerow(row)
👏Wow,  it is quick. it took only 3:06 mins and used 8 MB of RAM

What now? Why dont we turn to our all time favorite, Java? I see, some reluctance. But let's try.

Let's start with a our own parser which splits the columns using comma.

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;

public class CsvFilter {

    public static void main(String[] args) throws IOException {
        Path inputCsv = Paths.get("input.csv");
        Path termsFile = Paths.get("terms.txt");
        Path outputCsv = Paths.get("output.csv");

        List<String> searchTerms = Files.readAllLines(termsFile);
        Set<String> searchSet = new HashSet<>(searchTerms); // Faster lookup

        int[] columnsToSearch = {1, 2}; // example: columns 1 and 2

        try (
            Stream<String> lines = Files.lines(inputCsv);
            BufferedWriter writer = Files.newBufferedWriter(outputCsv)
        ) {
            // First write the header
            String header = lines.findFirst().orElse("");
            writer.write(header + "\n");

            Files.lines(inputCsv).skip(1)
                .filter(line -> matchesSearch(line, columnsToSearch, searchSet))
                .forEachOrdered(line -> {
                    try {
                        writer.write(line + "\n");
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
        }
    }

    static boolean matchesSearch(String line, int[] columns, Set<String> terms) {
        String[] fields = line.split(",");
        for (int col : columns) {
            if (col < fields.length) {
                for (String term : terms) {
                    if (fields[col].contains(term)) return true;
                }
            }
        }
        return false;
    }
}
Unbelievable!  It took 41 secs and used 360MB of RAM

⚡ Can It Be Parallelized?

Yes — you can use .parallel() on the stream:

Files.lines(inputCsv).skip(1).parallel()

But there are two caveats:

  1. I/O-bound bottleneck: Files.lines() reads line-by-line lazily, which isn’t well-suited for parallel processing by default. You need to read all lines first if you want true parallelism: 
  2.  Order preservation:
    1.  Use .forEachOrdered(...) if you care about preserving line order
    2. Without it, .forEach(...) may write out of order, which can be problematic for CSVs.

Hmm. Complicated. Can we turn to popular CSV libs in Java and see if they have solved this problem already (i.e., dont read the entire file to memory but at the same time employ parallelism)? We have opencsv and Apache commons csv parsers. Both are similar; so let's go with apache csv parser.

import org.apache.commons.csv.*;

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;

public class ApacheFilter {

    public static void main(String[] args) throws IOException {
        Path termsFile = Paths.get("terms.txt");

        List<String> searchTerms = Files.readAllLines(termsFile)
                .stream()
                .map(String::toLowerCase)
                .collect(Collectors.toList());
         Set<String> columnsToSearch = Set.of("col1"); 
        try (   Reader in = new FileReader("input.csv"); 
                CSVParser parser = new CSVParser(in, CSVFormat.DEFAULT.withFirstRecordAsHeader()); BufferedWriter out = new BufferedWriter(new FileWriter("output.csv")); 
                CSVPrinter printer = new CSVPrinter(out, CSVFormat.DEFAULT.withHeader(parser.getHeaderMap().keySet().toArray(new String[0])));) {
            
            parser.stream()
                .parallel()
                .filter(record -> matches(record, columnsToSearch, searchTerms))
                .sorted(Comparator.comparingLong(CSVRecord::getRecordNumber)) // preserve original order
                .forEachOrdered(record -> {
                    try {
                        printer.printRecord(record);
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
        }
    }

    private static boolean matches(CSVRecord record, Set<String> columnsToSearch, List<String> searchTerms) {
        for (String column : columnsToSearch) {
            String field = record.get(column).toLowerCase();
            for (String term : searchTerms) {
                if (field.contains(term)) {
                    return true;
                }
            }
        }
        return false;
    }
}
Not that great!  It took 54 secs and used 700 MB. I didnt set the Xmx though.

 

✅ Can FileChannel Help?

FileChannel is faster then streams. Yes, but not directly with Apache Commons CSV.

To leverage FileChannel, you’d need to:

  1. Manually split the file into byte ranges (chunks) using FileChannel.map() or FileChannel.read(ByteBuffer).

  2. Ensure chunk boundaries don’t break rows mid-line.

  3. Parse each chunk in a separate thread (with your own line parser or a library like uniVocity).

It’s more complex, but faster for large files.

 

Ok. Can we then use FileChannel with OpenCSV?

FileChannel channel = FileChannel.open(Paths.get("input.csv"), StandardOpenOption.READ);
InputStream inputStream = Channels.newInputStream(channel);
Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);

CSVReader csvReader = new CSVReader(reader);

 It's performance is as good as not using FileChannel. So what to do?

uniVocity-parsers

Alright after searching internet,  I found uniVocity-parsers. Lets give it a try.


<dependency>
  <groupId>com.univocity</groupId>
  <artifactId>univocity-parsers</artifactId>
  <version>2.9.1</version>
</dependency>
import com.univocity.parsers.csv.*;

import java.io.*;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.stream.Collectors;

public class CsvFilterWithUnivocity {

    public static void main(String[] args) throws IOException {
        Path inputCsv = Paths.get("input.csv");
        Path termsFile = Paths.get("terms.txt");
        Path outputCsv = Paths.get("output.csv");

        Set<String> columnsToSearch = Set.of("col1");

        // Load search terms
        List<String> searchTerms = Files.readAllLines(termsFile).stream()
                .map(String::toLowerCase)
                .collect(Collectors.toList());

        // Setup input with FileChannel
        FileChannel channel = FileChannel.open(inputCsv, StandardOpenOption.READ);
        InputStream inputStream = Channels.newInputStream(channel);
        Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);

        // Prepare writer
        BufferedWriter writer = Files.newBufferedWriter(outputCsv, StandardCharsets.UTF_8);
        CsvWriterSettings writerSettings = new CsvWriterSettings();
        CsvWriter csvWriter = new CsvWriter(writer, writerSettings);

        CsvParserSettings settings = new CsvParserSettings();
        settings.setHeaderExtractionEnabled(true);

        CsvParser parser = new CsvParser(settings);
        parser.beginParsing(reader);

        String[] headers = parser.getContext().headers();
        csvWriter.writeHeaders(headers);

        String[] row;
        while ((row = parser.parseNext()) != null) {
            Map<String, String> rowMap = new HashMap<>();
            for (int i = 0; i < headers.length; i++) {
                rowMap.put(headers[i], row[i]);
            }

            if (matches(rowMap, columnsToSearch, searchTerms)) {
                csvWriter.writeRow(row);
            }
        }

        parser.stopParsing();
        writer.close();
        channel.close();

        System.out.println("Done!");
    }

    private static boolean matches(Map<String, String> row, Set<String> columnsToSearch, List<String> searchTerms) {
        for (String col : columnsToSearch) {
            String value = row.getOrDefault(col, "").toLowerCase();
            for (String term : searchTerms) {
                if (value.contains(term)) {
                    return true;
                }
            }
        }
        return false;
    }
}
Great!  It took 25 secs and used 550 MB. I didnt set the Xmx though.

Can we profile it to see where the bottleneck is and whether there is any room for improvement?

 

If you observe the above graph, you can clearly see that the CSV reader thread is waiting most of the time in reading input. However the processing of the read data is also consuming some CPU. But note that all of them happen in single thread. So it is time to separate reading and processing onto their own threads.

Here is the updated code after profiling and optimizing. Found that 3 threads are optimum for my hardware (Intel hexa core with 12 hyper-threads); beyond that it is not influencing the response time positively.

import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import com.univocity.parsers.csv.CsvWriter;
import com.univocity.parsers.csv.CsvWriterSettings;
import org.apache.commons.csv.*;

import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.*;

public class MultiThreadedCsvFilter {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newWorkStealingPool(3);
        ExecutorService outputExecutor = Executors.newSingleThreadExecutor();

        Path inputCsv = Paths.get("input.csv");
        Path outputCsv = Paths.get("output.csv");
        Path termsFile = Paths.get("terms.txt");
        Set<String> columnsToSearch = Set.of("Order Number"); // example columns

        // Load search terms
        List<String> searchTerms = Files.readAllLines(termsFile).stream()
                .map(String::toLowerCase)
                .collect(Collectors.toList());

        // Setup input with FileChannel
        FileChannel channel = FileChannel.open(inputCsv, StandardOpenOption.READ);
        InputStream inputStream = Channels.newInputStream(channel);
        Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);

        // Prepare writer
        BufferedWriter writer = Files.newBufferedWriter(outputCsv, StandardCharsets.UTF_8);
        CsvWriterSettings writerSettings = new CsvWriterSettings();
        CsvWriter csvWriter = new CsvWriter(writer, writerSettings);

        CsvParserSettings settings = new CsvParserSettings();
        settings.setHeaderExtractionEnabled(true);

        CsvParser parser = new CsvParser(settings);
        parser.beginParsing(reader);

        String[] headers = parser.getContext().headers();
        csvWriter.writeHeaders(headers);

        String[] row;
        while ((row = parser.parseNext()) != null) {
            var r = row;
            executor.submit(() -> {
                Map<String, String> rowMap = new HashMap<>();
                for (int i = 0; i < headers.length; i++) {
                    rowMap.put(headers[i], r[i]);
                }

                if (matches(rowMap, columnsToSearch, searchTerms)) {
                    outputExecutor.submit(() -> csvWriter.writeRow(r));
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        outputExecutor.shutdown();
        outputExecutor.awaitTermination(1, TimeUnit.MINUTES);
        parser.stopParsing();
        writer.close();
        channel.close();

        System.out.println("Done!");
    }

    private static boolean matches(Map<String, String> row, Set<String> columnsToSearch, List<String> searchTerms) {
        for (String col : columnsToSearch) {
            String value = row.getOrDefault(col, "").toLowerCase();
            for (String term : searchTerms) {
                if (value.contains(term)) {
                    return true;
                }
            }
        }
        return false;
    }
}
Excellent!  It took 9 secs now.

JOB DONE!

Adding the final profile graph here. You can see that all the 3 processing threads are completely occupied, which is great. 

 

 

Comments

Popular posts from this blog

Using Nginx as proxy server for Keycloak

I have used Keycloak  in its very early stage ( when it is was in 2.x version). But now it has come a long way (at this time of writing it is in 21.x) In this article let's configure Keycloak behind Nginx. Here are the points to consider.  If you want to configure Apache2 as a proxy server for your java application, please check  this article . We are going to use a domain name other than localhost Anything other than localhost will require Keycloak to run in production mode which requires SSL configurations etc. Or it requires a proxy server. Lets begin. Requirements Keycloak distribution Ubuntu 22.04 server Configuring Keycloak 1. Download Keycloak from here . 2. Extract it using tar -xvzf  keycloak-21.0.1.tar.gz 3. Create a script file called keycloak.sh with the following contents #!/bin/bash export KEYCLOAK_ADMIN=<admin-username-here> export KEYCLOAK_ADMIN_PASSWORD=<admin-password-here> nohup keycloak-21.0.0/bin/kc.sh start-dev --proxy edge --hos...

Installing GoDaddy certificate in Wildfly/Keycloak

In the previous post we saw how to set up Keycloak . Here we will see how to generate and install GoDaddy.com certificate in Keycloak. The steps are similar for Wildfly as well. Step 1: Generate CSR file Run the following commands in your terminal. <mydomain.com> has to be replaced with your actual domain name. keytool -genkey -alias mydomain_com -keyalg RSA -keysize 2048 -keystore mydomain_com.jks keytool -certreq -alias mydomain_com -file mydomain_com.csr -keystore mydomain_com.jks Step 2: Generate certificate Upload  mydomain_com . csr  file content into GoDaddy.com, generate and download certificate for tomcat server (steps to generating SSL certificate is beyond the scope of this article). If you unzip the file, you will see the following files. gd_bundle-g2-g1.crt ..5f8c...3a89.crt   #some file with alphanumeric name gdig2.crt Files 1 and 2 are of our interest. Third file is not required. Step 3: Import certificate to key...

Hibernate & Postgresql

If you are using Hibernate 3.5 or above to talk to Postgresql database, have you ever tried to store a byte array? Let's take an example. Here is the mapping which will store and read byte[] from the database. @Lob @Column(name = "image") private byte[] image; Here is the JPA mapping file configuration. <persistence version="2.0"  xmlns="http://java.sun.com/xml/ns/persistence"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">   <persistence-unit name="testPU" transaction-type="JTA">     <provider>org.hibernate.ejb.HibernatePersistence</provider>     <jta-data-source>test</jta-data-source>     <properties>     </properties>   </persistence-unit> </persistence> When you try to save your entity you will get t...

Dynamic SOAP Service Client

If you have written SOAP service client, you might know that you need the WSDL file; need to generate Java code for that,compile that Java classes and add it as dependency for your module. What would you do if you have to incorporate your code with a new SOAP service every now and then? What would you do if all you need is to consume the service and do a little processing on the output, i.e., you need the data in XML format? What would you do if you don't have a complete WSDL? What would you do if your service is in .NET whose WSDL is having problem while generating Java classes? Is there a way to write a dynamic client which can consume any SOAP service? .... YES!... there is a way. Let's quickly write a web (SOAP) service. Software used: Java 7 NetBeans IDE 7.4 GlassFish 4.0 Maven Create a web project and choose Glassfish as server. Now add web service (not a rest service) as below. Edit the SimpleService.java as follows. package com.mycom...

How to retry a method call in Spring or Quarkus?

Have you ever come across a situation where you wanted to retry a method invocation automatically? Let's say you are calling a stock ticker service for a given stock and get a transient error. Since it is a transient error, you will try again and it may work in second attempt. But what if it doesn't? Well, you will try third time. But how many times can you try like that? More importantly after how much time will you retry? Imagine if you have a handful of methods like this. Your code will become convoluted with retry logic. Is there a better way? Well, if you are using spring/spring boot, you are in luck. Here is how you can do that using spring. Let's write our business service as follows. import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; import org.springframework.scheduling.annotation.Async; import...