First option that comes to our mind is shell script. It's simple and fast. Alright, lets do it.
Scope
- I have CSV file, input.csv with 10.1 million rows
- The file size is 2.1 GB
- I need to search for any of the 200 words I have in terms.txt file
- I want the matching rows in output.csv, containing any of the 200 search texts
#!/bin/bash
> output.csv # clear or create output file
# Read each search string from search.txt
while IFS= read -r search; do
# Loop through all CSV files inside the folders
find . -type f -name "*.csv" | while IFS= read -r csv; do
# Search for the string in the CSV file and append matching rows
grep -iF -- "$search" "$csv" >> output.csv
done
done < terms.txt
Wait.🤔
Im supposed to get only 100 rows but I got 103 rows.
Let's say you have a file containing orders and customers with,
- Order Id
- Order details
- Order amount
- Customer name
- Customer age
- Customer phone number
- Customer Address
If I search for 58, script will output the following rows, though my intention is to filter out orders amounting to 58.
- All the orders whose order id contains 58
- All the orders with amount 58
- All customers of age 58
- All customers having 58 in their phone number
- All customers having 58 in their address
Of course this is easier to de-duplicate than searching a [10] million records in a file. Despite that, can we improve it? Can we avoid manual de-duplication?
Ah, our savior, Python. How can anyone process a file without turning to Python? Python is powerful, flexible and requires only few lines of code (may be as less as our shell script).
Here we have two options:-
- Using plain CSV parser in python or
- Using Pandas.
Feature | `csv` module | `pandas` |
---|---|---|
Library type | Built-in | Third-party (install via `pip install pandas`) |
Ease of use | Low-level, more manual | High-level, very convenient |
Performance | Lightweight, faster for small files | Optimized, faster for large files |
Memory usage | Lower | Higher (loads full file into memory) |
Search/filter support | Manual loops/string matching | One-liners using `.str.contains()` or `.isin()` |
Column handling | Manual (by index or name if using DictReader) | Automatic, supports labels, index, dtype |
Data manipulation | Tedious | Very powerful: joins, grouping, etc. |
Output writing | Manual (via `csv.writer`) | Simple `.to_csv()` |
Let's start with pandas.
import pandas as pd
df = pd.read_csv("input.csv")
search_terms = pd.read_csv("terms.txt", header=None)[0].tolist()
columns_to_search = ['col1', 'col3'] # use column names
# Create a boolean mask
mask = df[columns_to_search].apply(lambda col: col.astype(str).apply(
lambda x: any(term in x for term in search_terms)), axis=1).any(axis=1)
df[mask].to_csv("output.csv", index=False)
It is throwing
This means Pandas detected inconsistent types (e.g., some rows are strings, others numbers) in columns 16 and 24. It’s a warning, not an error, but it can lead to unexpected behavior. That is, when pandas reads the file, it infers the column type as it goes. However if a column initially contains number for few rows, but later string for few rows, then pandas will complain like this.
So let's explicitly specify the type as string as follows.
df = pd.read_csv("input.csv", dtype=str)
Now I'm getting the following error.
That error means that one of the term
values in search_terms
is not a string.
So let's convert all the search terms to string. Here is the updated code.
import pandas as pd
# Read entire CSV as string to avoid type issues
df = pd.read_csv("input.csv", dtype=str)
# Read search terms and convert them to string (important!)
search_terms = pd.read_csv("terms.txt", header=None)[0].astype(str).tolist()
# Columns to search (use correct column names from the CSV)
columns_to_search = ['col1', 'col3']
# Apply search across the specified columns
mask = df[columns_to_search].apply(
lambda col: col.apply(lambda x: any(str(term) in x for term in search_terms)),
axis=1
).any(axis=1)
# Output the matching rows
df[mask].to_csv("output.csv", index=False)
This is not something that you would want to run in your dev machine.
Can we use threads to make it faster?
🧠 First, Let's understand the constraints
CSV reading is typically I/O-bound, but filtering/searching can become CPU-bound if the file is large or has complex logic.
Maintaining output order means:
-
We must either split work predictably (e.g., by line number or chunks) and reorder the results afterward, or
-
Use process/thread-safe queues with a sequence index and sort later.
-
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
import numpy as np
def process_chunk(index_chunk_tuple):
index, chunk, search_terms, columns = index_chunk_tuple
mask = chunk[columns].apply(lambda col: col.astype(str).apply(
lambda x: any(term in x for term in search_terms)), axis=1).any(axis=1)
return index, chunk[mask]
def parallel_filter(input_csv, terms_file, output_csv, columns_to_search, chunksize=10000):
search_terms = pd.read_csv(terms_file, header=None)[0].tolist()
reader = pd.read_csv(input_csv, chunksize=chunksize)
jobs = []
for i, chunk in enumerate(reader):
jobs.append((i, chunk, search_terms, columns_to_search))
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_chunk, jobs))
# Sort results by original chunk order
results.sort(key=lambda x: x[0])
filtered_chunks = [res[1] for res in results]
pd.concat(filtered_chunks).to_csv(output_csv, index=False)
# Example call:
# parallel_filter("input.csv", "terms.txt", "output.csv", ["col1", "col3"])
Ok, lets turn to built-in csv, though it may require more code, but with a hope that it will run faster and require less memory.
import csv
search_terms = set(line.strip() for line in open('terms.txt'))
columns_to_search = [1, 3] # example column indices (0-based)
with open('input.csv', newline='') as infile, open('output.csv', 'w', newline='') as outfile:
reader = csv.reader(infile)
writer = csv.writer(outfile)
for row in reader:
if any(any(term in row[col] for term in search_terms) for col in columns_to_search):
writer.writerow(row)
What now? Why dont we turn to our all time favorite, Java? I see, some reluctance. But let's try.
Let's start with a our own parser which splits the columns using comma.
import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;
public class CsvFilter {
public static void main(String[] args) throws IOException {
Path inputCsv = Paths.get("input.csv");
Path termsFile = Paths.get("terms.txt");
Path outputCsv = Paths.get("output.csv");
List<String> searchTerms = Files.readAllLines(termsFile);
Set<String> searchSet = new HashSet<>(searchTerms); // Faster lookup
int[] columnsToSearch = {1, 2}; // example: columns 1 and 2
try (
Stream<String> lines = Files.lines(inputCsv);
BufferedWriter writer = Files.newBufferedWriter(outputCsv)
) {
// First write the header
String header = lines.findFirst().orElse("");
writer.write(header + "\n");
Files.lines(inputCsv).skip(1)
.filter(line -> matchesSearch(line, columnsToSearch, searchSet))
.forEachOrdered(line -> {
try {
writer.write(line + "\n");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}
static boolean matchesSearch(String line, int[] columns, Set<String> terms) {
String[] fields = line.split(",");
for (int col : columns) {
if (col < fields.length) {
for (String term : terms) {
if (fields[col].contains(term)) return true;
}
}
}
return false;
}
}
⚡ Can It Be Parallelized?
Yes — you can use .parallel()
on the stream:
Files.lines(inputCsv).skip(1).parallel()
But there are two caveats:
- I/O-bound bottleneck:
Files.lines()
reads line-by-line lazily, which isn’t well-suited for parallel processing by default. You need to read all lines first if you want true parallelism: - Order preservation:
- Use
.forEachOrdered(...)
if you care about preserving line order - Without it,
.forEach(...)
may write out of order, which can be problematic for CSVs.
Hmm. Complicated. Can we turn to popular CSV libs in Java and see if they have solved this problem already (i.e., dont read the entire file to memory but at the same time employ parallelism)? We have opencsv and Apache commons csv parsers. Both are similar; so let's go with apache csv parser.
import org.apache.commons.csv.*;
import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;
public class ApacheFilter {
public static void main(String[] args) throws IOException {
Path termsFile = Paths.get("terms.txt");
List<String> searchTerms = Files.readAllLines(termsFile)
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList());
Set<String> columnsToSearch = Set.of("col1");
try ( Reader in = new FileReader("input.csv");
CSVParser parser = new CSVParser(in, CSVFormat.DEFAULT.withFirstRecordAsHeader()); BufferedWriter out = new BufferedWriter(new FileWriter("output.csv"));
CSVPrinter printer = new CSVPrinter(out, CSVFormat.DEFAULT.withHeader(parser.getHeaderMap().keySet().toArray(new String[0])));) {
parser.stream()
.parallel()
.filter(record -> matches(record, columnsToSearch, searchTerms))
.sorted(Comparator.comparingLong(CSVRecord::getRecordNumber)) // preserve original order
.forEachOrdered(record -> {
try {
printer.printRecord(record);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}
private static boolean matches(CSVRecord record, Set<String> columnsToSearch, List<String> searchTerms) {
for (String column : columnsToSearch) {
String field = record.get(column).toLowerCase();
for (String term : searchTerms) {
if (field.contains(term)) {
return true;
}
}
}
return false;
}
}
✅ Can FileChannel
Help?
FileChannel is faster then streams. Yes, but not directly with Apache Commons CSV.
To leverage FileChannel
, you’d need to:
-
Manually split the file into byte ranges (chunks) using
FileChannel.map()
orFileChannel.read(ByteBuffer)
. -
Ensure chunk boundaries don’t break rows mid-line.
-
Parse each chunk in a separate thread (with your own line parser or a library like uniVocity).
It’s more complex, but faster for large files.
Ok. Can we then use FileChannel with OpenCSV?
FileChannel channel = FileChannel.open(Paths.get("input.csv"), StandardOpenOption.READ);
InputStream inputStream = Channels.newInputStream(channel);
Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
CSVReader csvReader = new CSVReader(reader);
It's performance is as good as not using FileChannel. So what to do?
uniVocity-parsers
Alright after searching internet, I found uniVocity-parsers. Lets give it a try.
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.9.1</version>
</dependency>
import com.univocity.parsers.csv.*;
import java.io.*;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.stream.Collectors;
public class CsvFilterWithUnivocity {
public static void main(String[] args) throws IOException {
Path inputCsv = Paths.get("input.csv");
Path termsFile = Paths.get("terms.txt");
Path outputCsv = Paths.get("output.csv");
Set<String> columnsToSearch = Set.of("col1");
// Load search terms
List<String> searchTerms = Files.readAllLines(termsFile).stream()
.map(String::toLowerCase)
.collect(Collectors.toList());
// Setup input with FileChannel
FileChannel channel = FileChannel.open(inputCsv, StandardOpenOption.READ);
InputStream inputStream = Channels.newInputStream(channel);
Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
// Prepare writer
BufferedWriter writer = Files.newBufferedWriter(outputCsv, StandardCharsets.UTF_8);
CsvWriterSettings writerSettings = new CsvWriterSettings();
CsvWriter csvWriter = new CsvWriter(writer, writerSettings);
CsvParserSettings settings = new CsvParserSettings();
settings.setHeaderExtractionEnabled(true);
CsvParser parser = new CsvParser(settings);
parser.beginParsing(reader);
String[] headers = parser.getContext().headers();
csvWriter.writeHeaders(headers);
String[] row;
while ((row = parser.parseNext()) != null) {
Map<String, String> rowMap = new HashMap<>();
for (int i = 0; i < headers.length; i++) {
rowMap.put(headers[i], row[i]);
}
if (matches(rowMap, columnsToSearch, searchTerms)) {
csvWriter.writeRow(row);
}
}
parser.stopParsing();
writer.close();
channel.close();
System.out.println("Done!");
}
private static boolean matches(Map<String, String> row, Set<String> columnsToSearch, List<String> searchTerms) {
for (String col : columnsToSearch) {
String value = row.getOrDefault(col, "").toLowerCase();
for (String term : searchTerms) {
if (value.contains(term)) {
return true;
}
}
}
return false;
}
}
Can we profile it to see where the bottleneck is and whether there is any room for improvement?
If you observe the above graph, you can clearly see that the CSV reader thread is waiting most of the time in reading input. However the processing of the read data is also consuming some CPU. But note that all of them happen in single thread. So it is time to separate reading and processing onto their own threads.
Here is the updated code after profiling and optimizing. Found that 3 threads are optimum for my hardware (Intel hexa core with 12 hyper-threads); beyond that it is not influencing the response time positively.
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import com.univocity.parsers.csv.CsvWriter;
import com.univocity.parsers.csv.CsvWriterSettings;
import org.apache.commons.csv.*;
import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.*;
public class MultiThreadedCsvFilter {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newWorkStealingPool(3);
ExecutorService outputExecutor = Executors.newSingleThreadExecutor();
Path inputCsv = Paths.get("input.csv");
Path outputCsv = Paths.get("output.csv");
Path termsFile = Paths.get("terms.txt");
Set<String> columnsToSearch = Set.of("Order Number"); // example columns
// Load search terms
List<String> searchTerms = Files.readAllLines(termsFile).stream()
.map(String::toLowerCase)
.collect(Collectors.toList());
// Setup input with FileChannel
FileChannel channel = FileChannel.open(inputCsv, StandardOpenOption.READ);
InputStream inputStream = Channels.newInputStream(channel);
Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
// Prepare writer
BufferedWriter writer = Files.newBufferedWriter(outputCsv, StandardCharsets.UTF_8);
CsvWriterSettings writerSettings = new CsvWriterSettings();
CsvWriter csvWriter = new CsvWriter(writer, writerSettings);
CsvParserSettings settings = new CsvParserSettings();
settings.setHeaderExtractionEnabled(true);
CsvParser parser = new CsvParser(settings);
parser.beginParsing(reader);
String[] headers = parser.getContext().headers();
csvWriter.writeHeaders(headers);
String[] row;
while ((row = parser.parseNext()) != null) {
var r = row;
executor.submit(() -> {
Map<String, String> rowMap = new HashMap<>();
for (int i = 0; i < headers.length; i++) {
rowMap.put(headers[i], r[i]);
}
if (matches(rowMap, columnsToSearch, searchTerms)) {
outputExecutor.submit(() -> csvWriter.writeRow(r));
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
outputExecutor.shutdown();
outputExecutor.awaitTermination(1, TimeUnit.MINUTES);
parser.stopParsing();
writer.close();
channel.close();
System.out.println("Done!");
}
private static boolean matches(Map<String, String> row, Set<String> columnsToSearch, List<String> searchTerms) {
for (String col : columnsToSearch) {
String value = row.getOrDefault(col, "").toLowerCase();
for (String term : searchTerms) {
if (value.contains(term)) {
return true;
}
}
}
return false;
}
}
JOB DONE!
Adding the final profile graph here. You can see that all the 3 processing threads are completely occupied, which is great.
Comments
Post a Comment