Python客户端测试es的路由机制(routing)

  • 分类: Python
  • 发表日期:2021-07-13 14:58:00
  • 最后修改:2021-07-13 20:56:00

 

关于es routing的介绍,网上已经有很多介绍,这里不在赘述,只介绍使用过程中的一些注意事项及测试情况。

一、服务器概况

硬件平台:x86_64
机器硬件名:x86_64
系统处理器的体系结构:x86_64
内核版本:#1 SMP Fri May 8 10:59:10 UTC 2020
操作系统:GNU/Linux
操作系统的发行版号:4.18.0-193.el8.x86_64
Linux系统:CentOS Linux release 8.2.2004 (Core)
物理CPU个数:1
每个物理CPU核数:20
逻辑CPU个数:40
内存:32GB
彼时服务器资源占用情况:
捕获.PNG

 

二、导入数据

2.1 降低数据倾斜程度

注意这里说的是降低,在默认情况下 es 使用docid作为 routing key 将文档均匀分散在所有分片。

# 默认路由算法
shard = hash(routing) % number_of_primary_shards

而路由则是使用用户定义的routing key,具有相同routing key的文档,将被分配到同一个分片。

# 引入该参数以后路由算法
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

在实际开发中,可能存在某些routing key的文档数量很多,导致集群数据不平衡。
使用index.routing_partition_size可以将数据分配到分片的子集而不是单个分片。

# settings
{
    "index": {
        "number_of_shards": "30",
        "routing_partition_size": 10,
        "number_of_replicas": "0",
    }
}

这样做虽然可以使数据更均匀,但是搜索的分片也更多,所以请根据业务设置子集的大小。

2.2 文档 id 不再全局唯一

es 的每个分片都是一个倒排索引,索引 id 是唯一的。而 es 默认使用docid作为routing key,所以 es 只需要根据docid计算就能得知数据在哪个分片上。这也是为什么索引定义之后,分片数不能修改的原因。
而使用自定义路由,则可能存在docid一样,但是routing key不一样。例如某个索引将文档类型作为routing key,在首次插入时,routing key等于 A,被分配到了分片 1。在第二次插入时,routing key被修改等于 B,被分配到了分片 2。
这时如果不指定routing key查询,将会查出具有相同docid的两条数据。同样的,增加删除修改数据,都可能出现问题。所以使用自定义路由,只能用户自己保证docid的全局唯一性。
一种方法是,强制所有的增删改查操作都必须带有**routing**参数

# mappings
{
    "_routing": {
        "required": True 
    }
}

携带routing查询:

curl -XGET "http://localhost:9200/aiip.route_test/_search?routing=scenic&pretty" -H "Content-Type: application/json" -d '{"track_total_hits":true, "query":{"bool":{"filter":[{"match":{"title":"国家公园"}}]}}, "size": 3}'

routing key可以是多个,用逗号隔开即可。

2.3 Python客户端导入数据

使用elasticsearch.helpers.bulk提供的批量操作接口。
因业务不同,这里只贴出简化后的核心代码:

class EsHelper:

    def __init__(self, nodes:list=['http://127.0.0.1:9200'], pool=None, logger=None):
        self.pool = pool
        self.logger = logger

        if not self.pool:
            assert isinstance(nodes, list)
            self.conn = Elasticsearch(nodes, timeout=10)

            if not self.conn.ping():
                raise Exception('无法与es集群建立连接')

    def insert_init(self, size:int=500):
        self.bucket_ = []
        self.bulk_size = size
        self.bulk_total = 0

    def bulk(self, data):
        try:
            if self.pool:
                conn = self.pool.get_conn()
            else:
                conn = self.conn

            success, failed = helpers.bulk(
                conn, data, 
                stats_only=True, chunk_size=500, request_timeout=30
            )
            self.bulk_total += len(data)

        except:
            print(format_exc())
            if self.logger:
                self.logger.error(format_exc())

        finally:
            if self.pool:
                self.pool.put_conn(conn)

    @property
    def bucket(self):
        if self.bucket_:
            if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
                self.bucket_.pop()
                if self.bucket_:
                    self.bulk(self.bucket_)
                    self.bucket_ = []

        return self.bucket_
es_conn = EsHelper(pool=esconn_pool)
es_conn.insert_init()

for msg in file_or_kafka:
    _routing = msg['type']   # 根据业务设计 routing key

    action = {
        "_index": 'route_test'
        "_id": md5(msg["website"].strip().encode()).hexdigest(),   # 自定义 id
        "_routing": _routing,   # 多了这个字段
        "_source": msg
    }

    try:
        es_conn.bucket.append(action)
    except:
        from traceback import format_exc
        logger.error(format_exc())

es_conn.bucket.append(None)

 

三、测试

NO 总时间 ES 查询时间 MongoDB 查询时间 查询效果 备注
1 0.3753 0.365214 0.010013 正常 es查询100次mongo查询一次取12条数据,单位:秒/次。下同。
2 0.122578 0.113337 0.009176 正常  
3 0.189266 0.180636 0.008565 正常  

 

post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月30日 12:01 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:09 原创
post
2021年7月30日 16:02 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27

0 评论

大哥整点话呗~