File name
Commit message
Commit date
import re
import pyogrio
import pandas as pd
import geopandas as gpd
import multiprocessing as mp
import fiona
from tqdm import tqdm
def filter_rows_by_regex(file_path, pattern, chunk_size, chunk_num, queue):
gdf_chunk = pyogrio.read_dataframe(file_path, skip_features=chunk_num * chunk_size, max_features=chunk_size)
regex = re.compile(pattern)
filtered_chunk = gdf_chunk[gdf_chunk['A8'].str.contains(regex, na=False)]
queue.put(1) # Indicate progress
return filtered_chunk
def parallel_process(file_path, pattern, chunk_size):
# Get the number of features (rows) using fiona
with fiona.open(file_path) as src:
num_rows = len(src)
num_chunks = (num_rows // chunk_size) + 1
# Create a multiprocessing pool
manager = mp.Manager()
queue = manager.Queue()
pool = mp.Pool(mp.cpu_count())
# Create a progress bar
pbar = tqdm(total=num_chunks, desc="Processing Chunks")
# Process each chunk in parallel and track progress
results = []
for i in range(num_chunks):
result = pool.apply_async(filter_rows_by_regex, args=(file_path, pattern, chunk_size, i, queue))
results.append(result)
# Close the pool
pool.close()
# Update progress bar based on queue
for _ in range(num_chunks):
queue.get()
pbar.update(1)
# Wait for all processes to finish
pool.join()
pbar.close()
# Combine the results
filtered_chunks = [result.get() for result in results]
filtered_data = gpd.GeoDataFrame(pd.concat(filtered_chunks, ignore_index=True))
return filtered_data
# Example usage
file_path = 'DATA/refined/geopackage/토지이용계획정보_국토교통부_경북_20240406.gpkg'
pattern = r'아파트|공동주택'
filtered_data = parallel_process(file_path, pattern, 8128)
# Save or further process the resulting GeoDataFrame
pyogrio.write_dataframe(filtered_data, "아파트구획_경북.gpkg")