import re import pyogrio import pandas as pd import geopandas as gpd import multiprocessing as mp import fiona from tqdm import tqdm def filter_rows_by_regex(file_path, pattern, chunk_size, chunk_num, queue): gdf_chunk = pyogrio.read_dataframe(file_path, skip_features=chunk_num * chunk_size, max_features=chunk_size) regex = re.compile(pattern) filtered_chunk = gdf_chunk[gdf_chunk['A8'].str.contains(regex, na=False)] queue.put(1) # Indicate progress return filtered_chunk def parallel_process(file_path, pattern, chunk_size): # Get the number of features (rows) using fiona with fiona.open(file_path) as src: num_rows = len(src) num_chunks = (num_rows // chunk_size) + 1 # Create a multiprocessing pool manager = mp.Manager() queue = manager.Queue() pool = mp.Pool(mp.cpu_count()) # Create a progress bar pbar = tqdm(total=num_chunks, desc="Processing Chunks") # Process each chunk in parallel and track progress results = [] for i in range(num_chunks): result = pool.apply_async(filter_rows_by_regex, args=(file_path, pattern, chunk_size, i, queue)) results.append(result) # Close the pool pool.close() # Update progress bar based on queue for _ in range(num_chunks): queue.get() pbar.update(1) # Wait for all processes to finish pool.join() pbar.close() # Combine the results filtered_chunks = [result.get() for result in results] filtered_data = gpd.GeoDataFrame(pd.concat(filtered_chunks, ignore_index=True)) return filtered_data # Example usage file_path = 'DATA/refined/geopackage/토지이용계획정보_국토교통부_경북_20240406.gpkg' pattern = r'아파트|공동주택' filtered_data = parallel_process(file_path, pattern, 8128) # Save or further process the resulting GeoDataFrame pyogrio.write_dataframe(filtered_data, "아파트구획_경북.gpkg")