关于es routing的介绍,网上已经有很多介绍,这里不在赘述,只介绍使用过程中的一些注意事项及测试情况。
一、服务器概况
硬件平台:x86_64
机器硬件名:x86_64
系统处理器的体系结构:x86_64
内核版本:#1 SMP Fri May 8 10:59:10 UTC 2020
操作系统:GNU/Linux
操作系统的发行版号:4.18.0-193.el8.x86_64
Linux系统:CentOS Linux release 8.2.2004 (Core)
物理CPU个数:1
每个物理CPU核数:20
逻辑CPU个数:40
内存:32GB
彼时服务器资源占用情况:

二、导入数据
2.1 降低数据倾斜程度
注意这里说的是降低,在默认情况下 es 使用docid作为 routing key 将文档均匀分散在所有分片。
# 默认路由算法
shard = hash(routing) % number_of_primary_shards
而路由则是使用用户定义的routing key,具有相同routing key的文档,将被分配到同一个分片。
# 引入该参数以后路由算法
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
在实际开发中,可能存在某些routing key的文档数量很多,导致集群数据不平衡。
使用index.routing_partition_size可以将数据分配到分片的子集而不是单个分片。
# settings
{
    "index": {
        "number_of_shards": "30",
        "routing_partition_size": 10,
        "number_of_replicas": "0",
    }
}
这样做虽然可以使数据更均匀,但是搜索的分片也更多,所以请根据业务设置子集的大小。
2.2 文档 id 不再全局唯一
es 的每个分片都是一个倒排索引,索引 id 是唯一的。而 es 默认使用docid作为routing key,所以 es 只需要根据docid计算就能得知数据在哪个分片上。这也是为什么索引定义之后,分片数不能修改的原因。
而使用自定义路由,则可能存在docid一样,但是routing key不一样。例如某个索引将文档类型作为routing key,在首次插入时,routing key等于 A,被分配到了分片 1。在第二次插入时,routing key被修改等于 B,被分配到了分片 2。
这时如果不指定routing key查询,将会查出具有相同docid的两条数据。同样的,增加删除修改数据,都可能出现问题。所以使用自定义路由,只能用户自己保证docid的全局唯一性。
一种方法是,强制所有的增删改查操作都必须带有**routing**参数
# mappings
{
    "_routing": {
        "required": True 
    }
}
携带routing查询:
curl -XGET "http://localhost:9200/aiip.route_test/_search?routing=scenic&pretty" -H "Content-Type: application/json" -d '{"track_total_hits":true, "query":{"bool":{"filter":[{"match":{"title":"国家公园"}}]}}, "size": 3}'
routing key可以是多个,用逗号隔开即可。
2.3 Python客户端导入数据
使用elasticsearch.helpers.bulk提供的批量操作接口。
因业务不同,这里只贴出简化后的核心代码:
class EsHelper:
    def __init__(self, nodes:list=['http://127.0.0.1:9200'], pool=None, logger=None):
        self.pool = pool
        self.logger = logger
        if not self.pool:
            assert isinstance(nodes, list)
            self.conn = Elasticsearch(nodes, timeout=10)
            if not self.conn.ping():
                raise Exception('无法与es集群建立连接')
    def insert_init(self, size:int=500):
        self.bucket_ = []
        self.bulk_size = size
        self.bulk_total = 0
    def bulk(self, data):
        try:
            if self.pool:
                conn = self.pool.get_conn()
            else:
                conn = self.conn
            success, failed = helpers.bulk(
                conn, data, 
                stats_only=True, chunk_size=500, request_timeout=30
            )
            self.bulk_total += len(data)
        except:
            print(format_exc())
            if self.logger:
                self.logger.error(format_exc())
        finally:
            if self.pool:
                self.pool.put_conn(conn)
    @property
    def bucket(self):
        if self.bucket_:
            if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
                self.bucket_.pop()
                if self.bucket_:
                    self.bulk(self.bucket_)
                    self.bucket_ = []
        return self.bucket_
es_conn = EsHelper(pool=esconn_pool)
es_conn.insert_init()
for msg in file_or_kafka:
    _routing = msg['type']   # 根据业务设计 routing key
    action = {
        "_index": 'route_test'
        "_id": md5(msg["website"].strip().encode()).hexdigest(),   # 自定义 id
        "_routing": _routing,   # 多了这个字段
        "_source": msg
    }
    try:
        es_conn.bucket.append(action)
    except:
        from traceback import format_exc
        logger.error(format_exc())
es_conn.bucket.append(None)
三、测试
| NO | 总时间 | ES 查询时间 | MongoDB 查询时间 | 查询效果 | 备注 | 
|---|---|---|---|---|---|
| 1 | 0.3753 | 0.365214 | 0.010013 | 正常 | es查询100次,mongo查询一次取12条数据,单位:秒/次。下同。 | 
| 2 | 0.122578 | 0.113337 | 0.009176 | 正常 | |
| 3 | 0.189266 | 0.180636 | 0.008565 | 正常 | 
                                    
                                                
                                                
                                                
                                                
                                                
0 评论
大哥整点话呗~