SparkとMilvusによるプロダクション対応検索パイプラインの構築
スケーラブルなベクトル検索パイプラインを本番環境で構築するのは、プロトタイプを作るほど簡単ではない。プロトタイプの段階では、少量の非構造化データしか扱わないことが多い。しかし、プロトタイプを本番環境に移行する際には、通常、数百万から数十億の非構造化データと大量のクエリを扱う必要がある。そのため、データ取り込みや情報検索などの一般的なベクトル検索パイプライン操作を効率的に実行するための堅牢なソリューションが必要となる。
最近の講演で、Zillizのエコシステム&AIプラットフォーム責任者であるJiang Chen氏は、効率的で本番環境に即したベクトル検索パイプラインを構築するためのステップバイステップのプロセスを紹介した。この記事では、3つのトピックからなる講演の主なポイントについて説明する:
従来型およびRetrieval Augmented Generation (RAG)設定における情報検索のワークフロー。
MilvusとSparkを使ったRAGシステムにおけるスケーラブルなベクトル検索パイプラインの構築。
RAGシステムの品質向上のためのアドバイス。
早速、最初のメイントピックについて説明しよう。まずは、従来の設定における情報検索のワークフローから。
伝統的な情報検索のワークフロー
ディープラーニングが進歩する以前、伝統的な情報検索や検索システムは、タグと手作業によるラベリングに大きく依存していた。オンラインショップの場合を考えてみよう。オンラインショップは、顧客のニーズに応じて最適な商品を提供するために、商品タグに依存していた。そのため、オンラインショップは、毎日大量の顧客からの問い合わせに対応できるような、スケーラブルで効率的な検索パイプラインを必要としている。
この需要に対応するため、従来の検索システムのアーキテクチャは通常、オフラインのデータ取り込み用とオンラインのクエリ提供用の2つのコンポーネントに分かれている。
オフラインデータ取り込み
オフラインデータインジェストの主な目的は、すべてのデータをデータベースにロードすることである。このプロセスの最初のステップとして、社内文書やインターネットなど、1つまたは複数のソースからデータを収集する。データを取り込むことができたら、データのタグ付けを続けることができる。最後に、タグ付けされたデータにインデックスを付け、データベースにロードすることができる。
オンラインショップの例を使って、ワークフローを説明しよう。ウェブをクロールすることで、インターネットから商品の説明を入手することができる。次に、商品説明を入手したら、「clothes」、「dress」、「formal dress」、「party dress」など、商品説明を表すタグを作成する。次に、タグ、商品説明、価格、その他のメタデータのインデックスを作成し、構造化されていないデータベースに読み込みます。最後に、このデータベースを配信環境にプッシュする。
伝統的な情報検索アーキテクチャの2つの構成要素](https://assets.zilliz.com/Two_components_of_a_traditional_information_search_architecture_7a2cdae76d.png)
オンラインクエリーサービング
つ目のコンポーネントの主な目的は、顧客からのクエリに対応し、前のワークフローで作成したデータベースから情報検索を実行することである。
このプロセスは、フロントエンドとクエリコンパイラから始まり、ユーザーのクエリをタグのセットに合成する。次に、システムは生成されたタグを類似検索の入力として使用する。そして、最も類似したキーワードまたはタグを持つデータベース内の上位kエントリーがフェッチされ、ユースケースに応じて異なるアルゴリズムを使ってランク付けされる。ランク付けされた結果は、最終的にユーザーに返される。
従来の情報検索と検索システムの主な欠点は、意味理解の欠如である。タグや手作業で作成されたラベルは、意味的な意味やユーザーのクエリの意図を捉えることができず、不正確な検索結果につながる可能性がある。また、膨大な量のデータがある場合、各エントリーに手作業でタグを付けるのは面倒である。
新しい情報検索としてのRAGのワークフロー
ディープラーニングの急速な進歩は、情報検索プロセスの状況を大きく変えた。埋め込みモデルや、GPT、Claude、LLAMA、Mistralのような大規模言語モデル(LLM)の助けを借りて、ユーザーのクエリの意味的な意味を効果的に捉えることができ、データ入力ごとにラベルやタグを手作業で作成する必要がなくなる。
埋め込みモデルを使えば、非構造化データは、n次元ベクトルからなるベクトル埋め込みに変換できる。埋め込みデータの次元数は、使用するモデルに依存する。これらの埋め込みは、それらが表現するデータの意味的な意味を持ち、したがって、任意の2つの埋め込み間の類似性は、余弦距離のようなメトリックスを用いて容易に計算することができる。直感的には、似たような意味を持つ埋め込みは、ベクトル空間において互いに近くに配置される。
2次元ベクトル空間において類似した意味を持つ埋め込みベクトルの例](https://assets.zilliz.com/Example_of_vector_embeddings_that_carry_similar_semantic_meaning_in_a_2_D_vector_space_d6ca900d3a.png)
埋め込みができれば、それを直接Milvusのようなベクトルデータベースに取り込むことができ、データの取り込みが完了する。その後、情報検索を行うことができる。
ユーザからのクエリを受け取ると、データ取り込み部と同じ埋め込みモデルを用いて埋め込みに変換する。次に、パイプラインはベクトル検索を実行し、データベースから上位k個の最も類似した埋め込みを取得する。Retrieval Augmented Generation (RAG)のコンテキストでは、これらの類似埋め込みは、ユーザのクエリに答えるLLMのコンテキストとして使用されます。
RAGワークフロー](https://assets.zilliz.com/RAG_workflow_c37dbaa213.png)
SparkとMilvusによる## ベクトル検索パイプライン
RAGは、ベクトル検索から取得した関連するコンテキストをLLMに提供することで、LLMが生成する回答の精度を向上させる新しいアプローチである。しかし、RAGアプリケーションの構築はスケーラビリティの問題から難しい。
本番環境でRAGアプリケーションをデプロイする場合、数百万から数十億の非構造化データを扱う可能性が高い。さらに、RAGシステムは顧客から数千、あるいはそれ以上のクエリを受け取ることになります。したがって、これらの問題を効果的に処理するためには、効率的でスケーラブルなソリューションが必要であり、Apache Sparkが役立つのはこの点である。
このセクションでは、MilvusとSparkを使って検索パイプラインを構築する。Milvusはオープンソースのベクトルデータベースであり、大量のデータに対してベクトル検索を数秒で実行できる。一方、Sparkは強力なオープンソースの分散コンピューティングフレームワークで、特に大規模なデータセットを高速かつ効率的に処理・分析するのに役立つ。
まずはMilvusのインストールから始めよう。Milvusをインストールする方法はいくつかあるが、本番環境でMilvusを使いたい場合は、以下のコマンドでDockerにMilvusをインストールして実行するのが良いだろう:
# インストールスクリプトのダウンロード
$ curl -sfL <https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh> -o standalone_embed.sh
# Dockerコンテナを起動する
$ bash standalone_embed.sh start
オープンソースのベクターデータベースとして、Milvusは多くのツールやAIフレームワークとのシームレスな統合を提供し、RAGのような本番環境に対応したAI搭載アプリケーションの構築を容易にしている。Apache Sparkは、データの取り込みとクエリの検索処理を効率的に拡張するためにMilvusと一緒に使用できるフレームワークの1つです。
MilvusとSparkによるベクトル検索パイプラインのワークフロー例](https://assets.zilliz.com/Example_of_a_vector_search_pipeline_workflow_with_Milvus_and_Spark_35c3678bd2.png)
Sparkは分散処理システムであるため、データ処理タスクを複数のコンピュータに一括して分散させることができる。この機能により、RAGアプリケーションを本番環境でデプロイする際など、大量のデータを扱う際のデータ処理が高速化される。この統合のおかげで、MilvusとMySQLのような他のデータベースサービスとの間でデータを移動することもできる。
Apache Sparkをインストールするには、 最新のインストールドキュメント を参照してください。Sparkをインストールしたら、spark-milvus jarファイルもインストールする必要があります。
wget <https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar>
spark-milvus jarファイルをダウンロードしたら、以下の手順で依存関係に追加することができる:
# pyspark の場合
./bin/pyspark --jars spark-milvus-1.0.0-SNAPSHOT.jar
# スパークシェル用
./bin/spark-shell --jars spark-milvus-1.0.0-SNAPSHOT.jar
これでMilvusとSparkを統合する準備が整った。以下の例では、Spark dataframeからMilvusに直接データを取り込む方法を紹介します。
import org.apache.spark.sql.{SaveMode, SparkSession}.
import io.milvus.client.{MilvusClient, MilvusServiceClient}.
import io.milvus.grpc.{DataType, FlushResponse, ImportResponse}.
io.milvus.param.bulkinsert.{BulkInsertParam, GetBulkInsertStateParam} をインポートします。
import io.milvus.param.collection.{CreateCollectionParam, DescribeCollectionParam, FieldType, FlushParam, LoadCollectionParam}.
インポート io.milvus.param.dml.SearchParam
インポート io.milvus.param.index.CreateIndexParam
import io.milvus.param.{ConnectParam, IndexType, MetricType, R, RpcStatus}.
import zilliztech.spark.milvus.{MilvusOptions, MilvusUtils}.
インポートzilliztech.spark.milvus.MilvusOptions._.
インポート org.apache.spark.SparkConf
インポート org.apache.spark.sql.types._
import org.apache.spark.sql.{SaveMode, SparkSession}.
インポート org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.hadoop.fs.{FileSystem, Path}.
インポート org.apache.log4j.Logger
import org.slf4j.LoggerFactory
インポート java.util
インポート scala.collection.JavaConverters._
オブジェクト Hello extends App {
val spark = SparkSession.builder().master("local[*]")
.appName("HelloSparkMilvus")
.getOrCreate()
インポート spark.implicits._
// データフレームの作成
val sampleDF = Seq(
(1, "a", Seq(1.0,2.0,3.0,4.0,5.0))、
(2, "b", Seq(1.0,2.0,3.0,4.0,5.0))、
(3, "c", Seq(1.0,2.0,3.0,4.0,5.0))、
(4, "d", Seq(1.0,2.0,3.0,4.0,5.0))
).toDF("id", "text", "vec")
// milvusのオプションを設定する
val milvusOptions = Map(
"milvus.host" -> "localhost" -> uri、
"milvus.port" -> "19530"、
"milvus.collection.name"->"hello_spark_milvus"、
"milvus.collection.vectorField" -> "vec"、
"milvus.collection.vectorDim" -> "5"、
"milvus.collection.primaryKeyField", "id"
)
sampleDF.write.format("milvus")
.options(milvusOptions)
.mode(SaveMode.Append)
.save()
}
上記のコードスニペットでは、3つのフィールドを持つSpark DataFrameを"hello_spark_milvus"というコレクションにインジェストしている。エンベッディングは5次元のベクトルで構成されており、IDをコレクションのプライマリキーとして使用しています。
また、milvusOptionsマップの中でMilvusデータベースに関するいくつかの設定を行う必要がある:
milvus.host
とmilvus.portである:milvus.hostとmilvus.port:Milvusサーバーとポート。DockerでMilvusを実行する場合、デフォルトのポートは19530である。milvus.collection.name: データを取り込むMilvusデータベース内のコレクション名。milvus.collection.vectorField: ベクトル埋め込みを含むデータのカラム名。milvus.collection.vectorDim: ベクトル埋め込みデータの次元数。milvus.collection.primaryKeyField: 主キーを含むデータのカラム名。
Milvusの様々なオプションについてもっと知りたい方は、 Milvus documentation page を参照してください。
Milvusデータベースにデータを取り込んだので、コレクションのインデックス作成方法を指定する必要があります。Milvusは通常のFlatインデックス、Inverted Flat Index (IVF)、Hierarchical Navigable Small World (HNSW)など様々なインデックス作成方法をサポートしています。
以下の例では、HNSWをカスタマイズしたAUTOINDEXを使用する。ベクトル探索の指標として、L2距離を使用する。
val ユーザー名 = <YOUR_MILVUS_USER
val パスワード = <YOUR_MILVUS_PASSWORD
val connectParam: ConnectParam = ConnectParam.newBuilder
.withHost("localhost")
.withPort("19530")
.withAuthorization(username, password)
.build
val client:MilvusClient = new MilvusServiceClient(connectParam)
val createIndexParam = CreateIndexParam.newBuilder()
.withCollectionName("hello_spark_milvus")
.withIndexName("index_name")
.withFieldName("vec")
.withMetricType(MetricType.L2)
.withIndexType(IndexType.AUTOINDEX)
.build()
val createIndexR = client.createIndex(createIndexParam)
println(createIndexR)
次に、"hello_spark_milvus"コレクションをベクター検索する前にロードする必要がある。
import io.milvus.param.collection.{CreateCollectionParam, DescribeCollectionParam, FieldType, FlushParam, LoadCollectionParam}.
// コレクションをロード、ロードされたコレクションのみ検索可能
val loadCollectionParam = LoadCollectionParam.newBuilder().withCollectionName("hello_spark_milvus").build()
val loadCollectionR = client.loadCollection(loadCollectionParam)
println(loadCollectionR)
Milvusにデータを取り込み、インデックスを作成したので、いよいよベクトル検索を行う準備が整った。先ほど作成したSpark DataFrameの最初の行を入力ベクトルとして使用します。
// 入力データフレームの最初の行を検索ベクトルとして使用する。
val fieldList: util.List[String] = new util.ArrayList[String]()
fieldList.add("vec")
val searchVectors = util.Arrays.asList(sampleDF.first().getList(2))
val searchParam = SearchParam.newBuilder()
.withCollectionName("hello_spark_milvus")
.withMetricType(MetricType.L2)
.withOutFields("text")
.withVectors(searchVectors)
.withVectorFieldName("vec")
.withTopK(2)
.build()
val searchParamR = client.search(searchParam)
println(searchParamR)
ご覧のように、ベクトル検索を行うには SearchParam.newBuilder() メソッド内に以下のようなメソッドコールをいくつか用意する必要がある:
.withCollectionName()`: ベクトル検索を行うコレクション名。
.withMetricType()`: ベクトル検索に使用するメトリック。
.withOutFields(): 結果を返すコレクションの出力フィールド。.withVectors(): 入力ベクトルまたはクエリベクトル。.withVectorFieldName(): ベクトル埋め込みを含むコレクション内のフィールド。.withTopK(): クエリベクトルと最も類似した埋め込みを持つ上位 k エントリを返す。
RAGアプリケーションでは、最も類似した上位k個のエントリが、クエリと一緒にLLMに渡されるコンテキストとして使用されます。このようにして、LLMはクエリに対する正確な答えを生成するためにコンテキストを使用することができる。
MilvusとSparkの統合を活用できる高度なユースケースはまだまだある。例えば、MySQLのような通常のデータベースからデータを読み込み、ベクトル埋め込みに変換し、それらの埋め込みをMilvusに取り込むことができます。これらのユースケースはこのGitHubリポジトリやこのMilvusノートブックデモをご覧ください。.1720103973916.1720190674148.28&__hssc=175614333.1.1720190674148&__hsfp=3708446789&_gl=19ymgcf_gcl_auNTk3NzUxMjE1LjE3MTMxOTQ3MTE._gaMjgxNDA2MTY3LjE3MTMxOTQ2OTA._ga_KKMVYG8YF2MTcyMDE5MDY3NC4zMi4xLjE3MjAxOTA3MDcuMC4wLjA._ga_HT329313WV*MTcyMDE5MDY3NC45LjEuMTcyMDE5MDcwNy4wLjAuMA).
良いRAGは良いデータから生まれる
Milvusは多くのAIツールキットやフレームワークと統合されているため、本番環境ですぐに使えるRAG(Retrieval Augmented Generation)アプリケーションの開発が簡素化されている。しかし、RAGシステムを本番環境に導入した後は、システムによって生成される応答の品質を継続的に監視することが重要です。
レスポンスの品質を改善する必要がある場合は、より複雑なアルゴリズムに飛び込む前に、まず基本的なことを検討することが重要です。重要なのは、RAGシステムで使用するデータソースの品質です。
データ・ソースを評価する際には、以下の質問を考慮してください:
ユーザーのクエリに答えるために必要なデータがデータベースにあるか?
必要なデータをすべてデータベースに収集したか?
データベースにデータを取り込む前に、正しいデータ前処理を行ったか(例:データ解析、データクリーニング、チャンキング、適切な埋め込みモデルの使用)?
データソースの品質を確認したら、次にアルゴリズムの観点からRAGシステムの品質向上を検討することができます。RAGシステムのパフォーマンスを向上させるには、以下のような方法があります:
より強力な埋め込みモデルを使用する:より強力な埋め込みモデルを使用する**:異なる事前訓練済みまたはカスタム訓練済み埋め込みモデルを試して、データ内の意味的関係を最もよく捕らえるものを見つける。
クエリルーティングとサードパーティツールの統合**:エンベッディングモデルが問題でない場合、クエリルーティングのエージェントを適用し、追加のツールやデータソースと統合することで、RAGシステムを改善することができます。
基本的なことに集中し、データソースとアルゴリズムコンポーネントを継続的に反復することで、本番環境に対応したRAGアプリケーションがユーザーに高品質の回答を確実に提供できるようになります。
結論
Sparkのような様々なフレームワークとMilvusのシームレスな統合により、スケーラブルなLLM搭載アプリケーションの構築とデプロイが容易になりました。Sparkのデータ処理タスクを複数のコンピュータにバッチで分散する機能は、データ処理オペレーションを実に高速化する。この機能は、大量のデータをベクター・データベースに取り込みたいときや、アプリケーションが大量のユーザー・クエリーを同時に処理するときに特に役立ちます。
Milvusベクトルデータベースにデータを取り込み、ユーザーからのクエリを受け取ると、ベクトル検索を実行することができます。このプロセスは、高度に文脈化された回答を生成するためにLLMに渡すために、データベース内のデータの中から最も関連性の高い文脈を取得するために非常に重要です。
読み続けて

Why I’m Against Claude Code’s Grep-Only Retrieval? It Just Burns Too Many Tokens
Learn how vector-based code retrieval cuts Claude Code token consumption by 40%. Open-source solution with easy MCP integration. Try claude-context today.

The Great AI Agent Protocol Race: Function Calling vs. MCP vs. A2A
Compare Function Calling, MCP, and A2A protocols for AI agents. Learn which standard best fits your development needs and future-proof your applications.

Vector Databases vs. Hierarchical Databases
Use a vector database for AI-powered similarity search; use a hierarchical database for organizing data in parent-child relationships with efficient top-down access patterns.
