利用Elasticsearch实现地理位置、城市搜索服务

最近用到一些简单的地理位置查询接口,基于当前定位获取用户所在位置信息(省市区),然后基于该信息查询当前区域的......提供服务。

然后就自己研究了下GIS,作为一个程序员。自己能不能实现这个功能呢?答案当然是可以。立即开干。

思路:找到数据,写入数据库,利用Elasticsearch强大的搜索能力和丰富发热GIS数据处理能力实现。

GIS相关专业信息参考(bd上找到,还算专业):程序员GIS入门|前后端都要懂一点的GIS知识

经过一番寻找,“功夫不负有心人”,在网上找到了锐多宝 数据,比较完整。下载下来,格式是shape格式。

第一步:下载数据,从锐多宝下载

利用Elasticsearch实现地理位置、城市搜索服务

 第二步:写python脚本预处理数据:ShapFile 转 GeoJSON,ES处理GeoJSON比较强

import geopandas as gpd

# 读取 Shapefile
shapefile_path = 'D:/data/gis/2023年_CTAmap_1.12版/2023年省级/2023年省级.shp'
gdf = gpd.read_file(shapefile_path)

# 检查 GeoDataFrame
print(gdf.head())

# 如果需要,可以对数据进行预处理,比如过滤、选择特定列等
# gdf = gdf[['column1', 'column2', 'geometry']]

# 将 GeoDataFrame 转换为标准的 Pandas DataFrame (如果需要的话)
df = gdf.drop('geometry', axis=1).join(gdf['geometry'].apply(lambda x: gpd.GeoSeries(x).to_json()))

# 将 Pandas DataFrame 导出为 JSON 文件
output_json_path = 'D:/data/gis/2023-province-GeoJSON.gesjson'
# df.to_json(output_json_path, orient='records')

# 如果你想保留 GeoJSON 格式,可以直接保存 GeoDataFrame
gdf.to_file(output_json_path, driver='GeoJSON')

利用Elasticsearch实现地理位置、城市搜索服务

第三步:利用Python脚本将GeoJSON写入Elasticsearch

from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk import json  # 连接到 Elasticsearch es = Elasticsearch("http://localhost:9200")  # 检查连接 if not es.ping():     raise ValueError("Connection failed")  # 删除旧索引(如果存在) if es.indices.exists(index="province2023_geoshape_index_001"):     es.indices.delete(index="province2023_geoshape_index_001")  # 创建索引并定义 Mapping mapping = {     "mappings": {         "properties": {             "location": {                 "type": "geo_shape"             },             "name": {                 "type": "text"             }         }     } }  # 创建索引 es.indices.create(index="province2023_geoshape_index_001", body=mapping)  # 读取 GeoJSON 文件 with open("D:/data/gis/2023-province-GeoJSON.gesjson", "r", encoding="utf-8") as file:     geojson_data = json.load(file)  # 提取 GeoJSON 特征集合 features = geojson_data.get("features", [])  # 准备数据以供导入 documents = [] for feature in features:     doc = {         "location": {             "type": feature["geometry"]["type"],             "coordinates": feature["geometry"]["coordinates"]         }     }     if "properties" in feature:         doc.update(feature["properties"])     documents.append(doc)  # 定义批量大小 batch_size = 100  # 每次批量导入的数量   # 准备 actions def generate_actions(documents):     for doc in documents:         yield {             "_index": "province2023_geoshape_index_001",             "_source": doc         }   # 分批执行批量导入 for i in range(0, len(documents), batch_size):     end = min(i + batch_size, len(documents))     success, _ = bulk(es, generate_actions(documents[i:end]))     print(f"Bulk {i}-{end} completed, {success} documents indexed.")  print("All data indexed.")

第四步:计算出每条数据的区域的中心点(扩展功能,原始数据只有polygon多边形数据)

from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk import json import ssl  # 连接到 Elasticsearch es = Elasticsearch("http://localhost:9200")  # 检查连接 if not es.ping():     raise ValueError("Connection failed")  # 删除旧索引(如果存在) if es.indices.exists(index="province2023_centroid_geoshape_index_001"):     es.indices.delete(index="province2023_centroid_geoshape_index_001")  # 创建索引并定义 Mapping mapping = {     "mappings": {         "properties": {             "location": {                 "type": "geo_shape"             },             "centroid": {  # 新增字段                 "type": "geo_point"             },             "name": {                 "type": "text"             }         }     } }  # 创建索引 es.indices.create(index="province2023_centroid_geoshape_index_001", body=mapping)  # 读取 GeoJSON 文件 with open("D:/data/gis/2023-province-GeoJSON.gesjson", "r", encoding="utf-8") as file:     geojson_data = json.load(file)  # 提取 GeoJSON 特征集合 features = geojson_data.get("features", [])   def calculate_centroid(polygons):     total_area = 0.0     total_x = 0.0     total_y = 0.0      for polygon in polygons:         # 现在 polygon 是一个包含多个坐标的列表         centroid = calculate_simple_polygon_centroid(polygon)         area = calculate_polygon_area(polygon)          total_area += area         total_x += centroid[0] * area         total_y += centroid[1] * area      if total_area == 0:         # 如果总面积为零,则返回原点作为中心点         return [0, 0]     else:         return [total_x / total_area, total_y / total_area]   # is_coordinates_list方法 # 以下结构返回True,polygon 是一个包含坐标列表的列表 # [ #     [[x1, y1], [x2, y2], [x3, y3], ...], #     [[x1, y1], [x2, y2], [x3, y3], ...]  # 如果有内部孔洞 # ] # 以下结构返回Fasle,包含单个坐标的列表 # [ #     [x1, y1], #     [x2, y2], #     [x3, y3], #     ... # ]  def is_coordinate(coord):     return (             isinstance(coord, (list, tuple)) and             len(coord) == 2 and             all(isinstance(c, (int, float)) for c in coord)     )   def is_coordinates_list(coords):     # 检查 coords 是否是一个包含坐标列表的列表     if isinstance(coords, list):         if all(isinstance(c, list) and all(is_coordinate(coord) for coord in c) for c in coords):             return True     return False   def calculate_simple_polygon_centroid(polygon):     # 确定 polygon 的结构     if is_coordinates_list(polygon):         # polygon 是一个包含坐标列表的列表         x_sum = sum(coord[0] for coord in polygon[0])         y_sum = sum(coord[1] for coord in polygon[0])         num_points = len(polygon[0])     else:         # print(False, polygon[0])         # polygon 是一个包含多个坐标的列表         x_sum = sum(coord[0] for coord in polygon)         y_sum = sum(coord[1] for coord in polygon)         num_points = len(polygon)     # 计算平均坐标     centroid_x = x_sum / num_points     centroid_y = y_sum / num_points      return [centroid_x, centroid_y]   def calculate_polygon_area(polygon):     # 计算简单多边形的面积     area = 0.0     if is_coordinates_list(polygon):  # polygon 是一个包含坐标列表的列表         num_points = len(polygon[0])         for i in range(num_points):             j = (i + 1) % num_points             area += polygon[0][i][0] * polygon[0][j][1]             area -= polygon[0][j][0] * polygon[0][i][1]      else:  # polygon 是一个包含多个坐标的列表         num_points = len(polygon)         for i in range(num_points):             j = (i + 1) % num_points             area += polygon[i][0] * polygon[j][1]             area -= polygon[j][0] * polygon[i][1]     return abs(area) / 2.0   # 准备数据以供导入 documents = [] for feature in features:     # 检查坐标是否在有效范围内     coordinates = feature["geometry"]["coordinates"]     centroid = calculate_centroid(coordinates)      doc = {         "location": {             "type": feature["geometry"]["type"],             "coordinates": coordinates         },         "centroid": centroid,  # 添加中心点     }     if "properties" in feature:         doc.update(feature["properties"])     documents.append(doc)  # 定义批量大小 batch_size = 100  # 每次批量导入的数量   # 准备 actions def generate_actions(documents):     for doc in documents:         yield {             "_index": "district2023_centroid_geoshape_index_001",             "_source": doc         }   # 分批执行批量导入 for i in range(0, len(documents), batch_size):     end = min(i + batch_size, len(documents))     success, errors = bulk(es, generate_actions(documents[i:end]))     if errors:         print(f"Bulk {i}-{end} completed, {success} documents indexed, but {len(errors)} documents failed.")         for error in errors:             print(error)     else:         print(f"Bulk {i}-{end} completed, {success} documents indexed.")  print("All data indexed.")

第五步:利用elasticsearch的pipeline和reindex能力预处理数据

# geo_centroid 聚合是一种高级聚合,它可以计算一组地理位置的中心点。在 Elasticsearch 中,这个功能属于高级特性,通常只在 X-Pack(现在称为 Elastic Security 和 Elastic Observability)的许可证中可用。 # 试用30天可以体验 POST /province2023_geoshape_index_001/_search {   "size": 0,   "aggs": {     "centroid": {       "geo_centroid": {         "field": "location"       }     }   } }   POST province2023_centroid_geoshape_index_001/_search {   "query": {     "term": {       "省.keyword": {         "value": "陕西省"        }     }   } }  PUT _ingest/pipeline/copy_field_pipeline {   "description": "Copy the value of one field to another",   "processors": [     {       "copy": {         "from": "",          "to": "province_name"       }     }   ] } GET province2023_centroid_geoshape_index_001/_mapping  GET province2023_centroid_geoshape_index_001/_mapping   PUT _ingest/pipeline/province_multiple_copy_fields_pipeline {   "description": "Copy multiple fields to new fields and rename fields to new fields",   "processors": [     {       "set": {         "field": "province_name",         "value": "{{{省}}}"       }     },     {       "remove": {         "field": ""       }     },     {       "rename": {         "field": "省级码",         "target_field": "province_code"       }     },     {       "rename": {         "field": "省类型",         "target_field": "province_type"       }     },     {       "rename": {         "field": "VAR_NAME",         "target_field": "var_name"       }     },     {       "rename": {         "field": "ENG_NAME",         "target_field": "eng_name"       }     },     {       "rename": {         "field": "FIRST_GID",         "target_field": "first_gid"       }     },     {       "rename": {         "field": "FIRST_TYPE",         "target_field": "first_type"       }     }   ] }  GET province2023_centroid_geoshape_index_002/_count  GET province2023_centroid_geoshape_index_002/_mapping DELETE province2023_centroid_geoshape_index_002  PUT province2023_centroid_geoshape_index_002 {   "mappings": {     "properties": {       "eng_name": {         "type": "text",         "fields": {           "keyword": {             "type": "keyword",             "ignore_above": 256           }         }       },       "first_gid": {         "type": "text",         "fields": {           "keyword": {             "type": "keyword",             "ignore_above": 256           }         }       },       "first_type": {         "type": "text",         "fields": {           "keyword": {             "type": "keyword",             "ignore_above": 256           }         }       },       "var_name": {         "type": "text",         "fields": {           "keyword": {             "type": "keyword",             "ignore_above": 256           }         }       },       "centroid": {         "type": "geo_point"       },       "location": {         "type": "geo_shape"       },       "name": {         "type": "text"       },       "year": {         "type": "text",         "fields": {           "keyword": {             "type": "keyword",             "ignore_above": 256           }         }       }     }   } }  POST _reindex {   "source": {     "index": "province2023_centroid_geoshape_index_001"   },   "dest": {     "index": "province2023_centroid_geoshape_index_002",     "pipeline": "province_multiple_copy_fields_pipeline"   } }  GET province2023_centroid_geoshape_index_002/_search

第六步:查询数据 geo_distance

# centroid字段的type是 geo_point,存储的经纬度形式是数组Geopoint as an array # geo_bounding_box 可查找边框内的所有地理坐标点。 POST province2023_centroid_geoshape_index_002/_search {   "query": {     "geo_bounding_box": {        "centroid": {         "top_left": {           "lat": 42,           "lon": -72         },         "bottom_right": {           "lat": 40,           "lon": -74         }       }     }   } }  POST province2023_centroid_geoshape_index_002/_search {   "query": {     "geo_distance": {       "distance": 100,       "centroid": {         "lat": 40.09937484066758,         "lon": 116.41960604340115       }     }   } }  POST province2023_centroid_geoshape_index_002/_search {   "query": {     "bool": {       "must": {         "match": {           "province_name":"xx市"         }       },       "filter": {         "geo_distance": {           "distance": "2km",           "centroid": {             "lat": 40.09937484066758,             "lon": 116.41960604340115           }         }       }     }   } }   POST province2023_centroid_geoshape_index_002/_search {   "query": {     "bool": {       "must": {         "match": {           "province_name":"xx市"         }       },       "filter": {         "geo_distance": {           "distance": "200km",           "location": {             "lat": 40.09937484066758,             "lon": 116.41960604340115           }         }       }     }   } }

 

发表评论

评论已关闭。

相关文章