#!/usr/bin/env python # coding: utf-8 # # ハイブリッド検索 (Amazon SageMaker 編) # ## 概要 # 本ラボでは、テキスト検索、ベクトル検索、スパース検索を組み合わせたハイブリッド検索を実装していきます。 # # ### 前提事項 # 本ラボの実施にあたっては、以下のラボを事前に完了している必要があります。これらのラボで作成したインデックスを元にハイブリッド検索を実装するためです。まだ未実施の場合は、項番に沿ってラボを完了させてください。 # # 1. [ニューラル検索の実装 (Amazon SageMaker 編)](../vector-search/neural-search-with-sagemaker.ipynb) # 1. [ニューラルスパース検索の実装 (Amazon SageMaker 編)](../sparse-search/neural-sparse-search-with-sagemaker.ipynb) # # ### ハイブリッド検索の概要 # ハイブリッド検索は、複数の検索を実行し、各結果をマージ、スコアを平準化したうえでランク付けを行う機能です。 # # OpenSearch では [Hybrid query][hybrid-search] と [Normalization processor][normalization-processor] を組み合わせることでハイブリッド検索を実装することができます。 # # Hybrid query は複数のクエリを実行した結果を組み合わせるものです。単に Hybrid query を実行するだけでは、個々のクエリごとのスコア計算方法やベースのスコア値が大きく異なることで偏った結果となるため、Normalization processor によりスコアの平準化を行います。 # # 以下はハイブリッド検索の処理フローです # # # # [hybrid-search]: https://opensearch.org/docs/latest/search-plugins/hybrid-search/ # [normalization-processor]: https://opensearch.org/docs/latest/search-plugins/search-pipelines/normalization-processor/ # ## 事前作業 # ### パッケージインストール # In[10]: get_ipython().system('pip install opensearch-py requests-aws4auth --quiet') # ### インポート # In[11]: import boto3 import json import time import logging import pandas as pd import numpy as np from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth # ### ヘルパー関数の定義 # 以降の処理を実行する際に必要なヘルパー関数を定義しておきます。 # In[12]: def search_cloudformation_output(stackname, key): cloudformation_client = boto3.client("cloudformation", region_name=default_region) for output in cloudformation_client.describe_stacks(StackName=stackname)["Stacks"][0]["Outputs"]: if output["OutputKey"] == key: return output["OutputValue"] raise ValueError(f"{key} is not found in outputs of {stackname}.") def search_sagemaker_inference_endpoint(model_id, region): sagemaker_client = boto3.client("sagemaker", region_name=region) response = sagemaker_client.search( Resource="Endpoint", SearchExpression={ "Filters": [ { "Name": "EndpointName", "Operator": "Contains", "Value": model_id }, ], }, SortBy="LastModifiedTime", SortOrder="Descending", MaxResults=1, ) return response["Results"] # ### 共通変数のセット # In[13]: default_region = boto3.Session().region_name logging.getLogger().setLevel(logging.ERROR) # ## ハイブリッド検索の事前準備 # ハイブリッド検索を行うにあたって Normalization Processor を呼び出す search pipeline の作成が必要となります。以下で作成を行います。 # ### OpenSearch 関連リソースの作成 # #### OpenSearch クライアントの作成 # ドメイン(クラスター)に接続するためのエンドポイント情報を CloudFormation スタックの出力から取得し、OpenSearch クライアントを作成します。 # In[14]: cloudformation_stack_name = "search-lab-jp" opensearch_cluster_endpoint = search_cloudformation_output(cloudformation_stack_name, "OpenSearchDomainEndpoint") credentials = boto3.Session().get_credentials() service_code = "es" auth = AWSV4SignerAuth(credentials=credentials, region=default_region, service=service_code) opensearch_client = OpenSearch( hosts=[{"host": opensearch_cluster_endpoint, "port": 443}], http_compress=True, http_auth=auth, use_ssl=True, verify_certs=True, connection_class = RequestsHttpConnection ) opensearch_client # OpenSearch クラスターへのネットワーク接続性が確保されており、OpenSearch の Security 機能により API リクエストが許可されているかを確認します。 # レスポンスに cluster_name や cluster_uuid が含まれていれば、接続確認が無事完了したと判断できます # In[15]: opensearch_client.info() # #### Normalization processor を含む search Pipeline 作成 # 各検索のスコアの重みづけは weights 要素内で実施しています。ここで定義した weights は Hybrid query 実行時に指定する queries 内の各検索条件と対応しています。 # # まずは、[Neural search の実装 (Amazon SageMaker 編)](./neural-search-with-sagemaker.ipynb) で作成した検索パイプラインをベースに Normalization processor を含むパイプラインを作成します。作成にあたって、過去のパイプラインの設定を流用します。 # # In[16]: embedding_model_id_on_hf = "BAAI/bge-m3" embedding_model_name = embedding_model_id_on_hf.lower().replace("/", "-") neural_search_pipeline_id = f"{embedding_model_name}_neural_search_query" response = opensearch_client.http.get("/_search/pipeline/" + neural_search_pipeline_id) request_processors = response[neural_search_pipeline_id]["request_processors"] embedding_model_id = request_processors[0]["neural_query_enricher"]["default_model_id"] print(f"embedding model id: {embedding_model_id}") sparse_encoding_model_id_on_hf = "hotchpotch/japanese-splade-v2" sparse_encoding_model_name = sparse_encoding_model_id_on_hf.lower().replace("/", "-") neural_sparse_search_pipeline_id = f"{sparse_encoding_model_name}_neural_sparse_search_query" response = opensearch_client.http.get("/_search/pipeline/" + neural_sparse_search_pipeline_id) request_processors = response[neural_sparse_search_pipeline_id]["request_processors"] sparse_encoding_model_id = request_processors[1]["neural_query_enricher"]["default_model_id"] print(f"sparse encoding model id: {sparse_encoding_model_id}") neural_sparse_two_phase_processor = request_processors[0] # In[17]: sparse_encoding_model_id_on_hf = "hotchpotch/japanese-splade-v2" sparse_encoding_model_name = sparse_encoding_model_id_on_hf.lower().replace("/", "-") neural_sparse_search_pipeline_id = f"{sparse_encoding_model_name}_neural_sparse_search_query" response = opensearch_client.http.get("/_search/pipeline/" + neural_sparse_search_pipeline_id) print(response) #request_processors = response[neural_sparse_search_pipeline_id]["request_processors"] #sparse_encoding_model_id = request_processors[1]["neural_query_enricher"]["default_model_id"] #print(f"sparse encoding model id: {sparse_encoding_model_id}") #neural_sparse_two_phase_processor = request_processors[0] # 上記で取得したモデル ID に加えて、Phase results processors に [Normalization processor][normalization-processor] を追加します。 # # [normalization-processor]: https://opensearch.org/docs/latest/search-plugins/search-pipelines/normalization-processor/ # In[18]: payload={ "request_processors": [ neural_sparse_two_phase_processor, { "neural_query_enricher" : { "default_model_id": embedding_model_id, "neural_field_default_id": { "question_sparse_embedding" : sparse_encoding_model_id, "context_sparse_embedding" : sparse_encoding_model_id } } } ], "phase_results_processors": [ { "normalization-processor": { "normalization": { "technique": "min_max" }, "combination": { "technique": "arithmetic_mean", "parameters": { "weights": [ 0.7, 0.3 ] } } } } ] } # パイプライン ID の指定 hybrid_search_pipeline_id = f"{embedding_model_name}_{sparse_encoding_model_name}_hybrid_search" # パイプライン作成 API の呼び出し response = opensearch_client.http.put("/_search/pipeline/" + hybrid_search_pipeline_id, body=payload) print(response) response = opensearch_client.http.get("/_search/pipeline/" + hybrid_search_pipeline_id) print(response) # ## ハイブリッド検索の実行 # ### テキスト検索をベクトル検索で補う # テキスト検索はクエリに厳密にマッチするドキュメントを取得可能です。一方でクエリに厳密にマッチしないものの、意図としては近いドキュメントまでは拾うことができません。 # In[19]: index_name = "jsquad-neural-search" query = "日本で梅雨がない地域は?" payload = { "query": { "match": { "question": { "query": query, "operator": "and" } } }, "_source": False, "fields": ["question", "answers", "context"], "size": 10 } response = opensearch_client.search( index=index_name, body=payload ) pd.json_normalize(response["hits"]["hits"]) # 他方、ベクトル検索は意味的に近いドキュメントを検索することに長けています # In[20]: index_name = "jsquad-neural-search" query = "日本で梅雨がない地域は?" payload = { "size": 10, "query": { "neural": { "question_embedding": { "query_text": query, "k": 10 } } }, "_source" : False, "fields": ["question", "answers", "context"] } response = opensearch_client.search( index=index_name, body=payload, search_pipeline = hybrid_search_pipeline_id ) pd.json_normalize(response["hits"]["hits"]) # 両者を組み合わせることで、意味的に近い検索結果もフォローしつつ、テキスト検索でマッチするドキュメントについてはよりスコアを上げる = 上位にランク付けすることが可能となります。 # In[21]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-neural-search"\nquery = "日本で梅雨がない地域は?"\n\npayload = {\n "size": 10,\n "query": {\n "hybrid": {\n "queries": [\n {\n "match": {\n "question": {\n "query": query,\n "operator": "and"\n }\n }\n },\n {\n "neural": {\n "question_embedding": {\n "query_text": query, # テキストをベクトルに変換し\n "k": 10 # クエリベクトルに近いベクトルのうち上位 10 件を返却\n }\n }\n }\n ]\n }\n },\n "_source" : False,\n "fields": ["question", "answers", "context"]\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n filter_path = "hits.hits",\n search_pipeline = hybrid_search_pipeline_id \n)\n\n# 結果を表示\npd.json_normalize(response["hits"]["hits"])\n') # ベクトル検索によって意図しない結果が付与されてしまう場合は、別のラボで解説するリランキングや、k ではなく min_score によるフィルタリングが有効です # # In[22]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-neural-search"\nquery = "日本で梅雨がない地域は?"\npayload = {\n "size": 10,\n "query": {\n "hybrid": {\n "queries": [\n {\n "match": {\n "question": {\n "query": query,\n "operator": "and"\n }\n }\n },\n {\n "neural": {\n "question_embedding": {\n "query_text": query, # テキストをベクトルに変換し\n "min_score": 0.7\n }\n }\n }\n ]\n }\n },\n "_source" : False,\n "fields": ["question", "answers", "context"]\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n filter_path = "hits.hits",\n search_pipeline = hybrid_search_pipeline_id \n)\n\n# 結果を表示\npd.json_normalize(response["hits"]["hits"])\n') # ### ベクトル検索をテキスト検索で補う # ベクトル検索は意味的に近い文書を検索することに長けていますが、反面厳密なマッチングができないケースがあります。例えば、製品の型番などの業務固有のパラメーターでの検索は不得手です。 # # 以下のように "M" だけを検索対象としてみると、ベクトル検索は無関係の結果を返却します。 # In[23]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-neural-search"\nquery = "M"\npayload = {\n "size": 10,\n "query": {\n "neural": {\n "question_embedding": {\n "query_text": query, \n "k": 10,\n }\n }\n },\n "_source" : False,\n "fields": ["question", "answers", "context"]\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n filter_path = "hits.hits",\n search_pipeline = hybrid_search_pipeline_id \n)\n\n# 結果を表示\npd.json_normalize(response["hits"]["hits"])\n') # 一方、テキスト検索の方は M を含む結果を返します。 # In[24]: index_name = "jsquad-neural-search" query = "M" payload = { "query": { "match": { "question": { "query": query, "operator": "and" } } }, "_source": False, "fields": ["question", "answers", "context"], "size": 10 } response = opensearch_client.search( index=index_name, body=payload ) pd.json_normalize(response["hits"]["hits"]) # ベクトル検索単体からハイブリッド検索に切り替えることで、検索精度の向上を達成できました。 # In[25]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-neural-search"\nquery = "M"\npayload = {\n "size": 10,\n "query": {\n "hybrid": {\n "queries": [\n {\n "match": {\n "question": {\n "query": query,\n "operator": "and"\n }\n }\n },\n {\n "neural": {\n "question_embedding": {\n "query_text": query, # テキストをベクトルに変換し\n "min_score": 0.7\n }\n }\n }\n ]\n }\n },\n "_source" : False,\n "fields": ["question", "answers", "context"]\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n filter_path = "hits.hits",\n search_pipeline = hybrid_search_pipeline_id \n)\n\n# 結果を表示\npd.json_normalize(response["hits"]["hits"])\n') # ### テキスト検索とスパース検索のハイブリッド検索 # ベクトル検索の代わりに、スパース検索でテキスト検索を補完することも可能です。 # In[26]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-neural-sparse-search"\nquery = "日本で梅雨がない地域は?"\n\npayload = {\n "size": 10,\n "query": {\n "hybrid": {\n "queries": [\n {\n "match": {\n "question": {\n "query": query,\n "operator": "and"\n }\n }\n },\n {\n "neural_sparse": {\n "question_sparse_embedding": {\n "query_text": query\n }\n }\n }\n ]\n }\n },\n "_source" : False,\n "fields": ["question", "answers", "context"]\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n filter_path = "hits.hits",\n search_pipeline = hybrid_search_pipeline_id \n)\n\n# 結果を表示\npd.json_normalize(response["hits"]["hits"])\n') # ### ベクトル検索とスパース検索のハイブリッド検索 # ベクトル検索とスパース検索を組み合わせることも可能です。 # # 現時点ではベクトルとトークンリスト両方のデータを持つインデックスが存在しないため、jsquad-hybrid-search というインデックスを作成し、jsquad-neural-search からデータを jsquad-hybrid-search に Reindex API で移行します。移行中に、Ingestion pipeline を使用した Sparse encoding の作成と埋め込みも合わせて実施します。 # #### インデックスの作成 # In[27]: payload = { "mappings": { "properties": { "id": {"type": "keyword"}, "question": {"type": "text", "analyzer": "custom_sudachi_analyzer"}, "context": {"type": "text", "analyzer": "custom_sudachi_analyzer"}, "answers": {"type": "text", "analyzer": "custom_sudachi_analyzer"}, "question_embedding": { "type": "knn_vector", "dimension": 1024, "space_type": "l2", "method": { "name": "hnsw", "engine": "faiss", } }, "context_embedding": { "type": "knn_vector", "dimension": 1024, "space_type": "l2", "method": { "name": "hnsw", "engine": "faiss", }, }, "question_sparse_embedding": { "type": "rank_features" }, "context_sparse_embedding": { "type": "rank_features" }, } }, "settings": { "index.knn": True, "index.number_of_shards": 1, "index.number_of_replicas": 0, "analysis": { "analyzer": { "custom_sudachi_analyzer": { "char_filter": ["icu_normalizer"], "filter": [ "sudachi_normalizedform", "custom_sudachi_part_of_speech" ], "tokenizer": "sudachi_tokenizer", "type": "custom" } }, "filter": { "custom_sudachi_part_of_speech": { "type": "sudachi_part_of_speech", "stoptags": ["感動詞,フィラー","接頭辞","代名詞","副詞","助詞","助動詞","動詞,一般,*,*,*,終止形-一般","名詞,普通名詞,副詞可能"] } } } } } # インデックス名を指定 index_name = "jsquad-hybrid-search" try: # 既に同名のインデックスが存在する場合、いったん削除を行う print("# delete index") response = opensearch_client.indices.delete(index=index_name) print(json.dumps(response, indent=2)) except Exception as e: print(e) # インデックスを作成 response = opensearch_client.indices.create(index_name, body=payload) response # #### パイプラインの取得 # In[28]: ingestion_pipeline_id = f"{sparse_encoding_model_name}_neural_sparse_search_ingestion" response = opensearch_client.http.get("/_ingest/pipeline/" + ingestion_pipeline_id) print(response) # #### Reindex API によるデータコピー + Ingest Pipeline による Sparse encoding # In[29]: payload = { "source":{ "index":"jsquad-neural-search", "size": 20 }, "dest":{ "index":"jsquad-hybrid-search", "pipeline": ingestion_pipeline_id } } response = opensearch_client.reindex(body=payload, slices="1", wait_for_completion=False) task_id = response["task"] print(json.dumps(response, indent=2, ensure_ascii=False)) # In[30]: response = opensearch_client.tasks.get(task_id=task_id) while response["completed"] == False: time.sleep(1) response = opensearch_client.tasks.get(task_id=task_id) print(json.dumps(response, indent=2, ensure_ascii=False)) # ドキュメントを 1 件取得し、Sparse encoding により生成されたトークンリストとベクトルの両方が格納されていることを確認します。 # In[31]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-hybrid-search"\n\npayload = {\n "size": 1,\n "query": {\n "match_all": {}\n },\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n)\n\nprint(json.dumps(response, indent=2, ensure_ascii=False))\n') # ハイブリッドクエリを実際に実行し、Sparse neural search と Sparse search によるハイブリッド検索が実行できることを確認します。 # In[32]: get_ipython().run_cell_magic('time', '', 'index_name = "jsquad-hybrid-search"\nquery = "M"\n\npayload = {\n "size": 10,\n "query": {\n "hybrid": {\n "queries": [\n {\n "neural_sparse": {\n "question_sparse_embedding": {\n "query_text": query\n }\n }\n },\n {\n "neural": {\n "question_embedding": {\n "query_text": query, # テキストをベクトルに変換し\n "min_score": 0.7\n }\n }\n }\n ]\n }\n },\n "_source" : False,\n "fields": ["question", "answers", "context"]\n}\n# 検索 API を実行\nresponse = opensearch_client.search(\n body = payload,\n index = index_name,\n filter_path = "hits.hits",\n search_pipeline = hybrid_search_pipeline_id \n)\n\n# 結果を表示\npd.json_normalize(response["hits"]["hits"])\n') # ## まとめ # ハイブリッド検索で様々な検索を組み合わせられることを確認してきました。ハイブリッド検索は必ずしもベクトル検索との組み合わせが必須ではなく、テキスト検索同士の組み合わせも可能です。様々な場面での利用を検討してみてください。 # # 時間がある方は、続いて以下のラボも実施してみましょう。 # # - [セマンティックリランキング (Amazon SageMaker 編)](../reranking/semantic-reranking-with-sagemaker.ipynb) # ## 後片付け # ### インデックス削除 # In[33]: index_name = "jsquad-hybrid-search" try: response = opensearch_client.indices.delete(index=index_name) print(json.dumps(response, indent=2)) except Exception as e: print(e)