批量插入(bulk_insert)

从文件中批量插入实体

Milvus 2.2现在支持从文件中批量插入实体。与insert()方法相比,此功能减少了Milvus客户端、代理、Pulsar和数据节点之间的网络传输。现在,您只需要几行代码就可以将一个文件或多个文件中的实体批量导入到一个集合中。

准备数据文件

将要插入Milvus集合的数据组织成基于行的JSON文件或多个NumPy文件。

基于行的JSON文件

您可以为文件命名,以使其有意义,但根键必须为root。在文件中,每个实体都以字典形式组织。字典中的键是字段名称,值是相应实体中的字段值。

以下是基于行的JSON文件的示例。

{
  "rows":[
    {"book_id": 101, "word_count": 13, "book_intro": [1.1, 1.2]},
    {"book_id": 102, "word_count": 25, "book_intro": [2.1, 2.2]},
    {"book_id": 103, "word_count": 7, "book_intro": [3.1, 3.2]},
    {"book_id": 104, "word_count": 12, "book_intro": [4.1, 4.2]},
    {"book_id": 105, "word_count": 34, "book_intro": [5.1, 5.2]}
  ]
}
 
  • 不要添加不存在于目标集合中的字段,也不要漏掉目标集合模式定义的任何字段。

  • 在每个字段中使用正确类型的值。例如,在整数字段中使用整数,在浮点字段中使用浮点数,在varchar字段中使用字符串,在向量字段中使用浮点数组。

  • 不要在JSON文件中包含自动生成的主键。

  • 对于二进制向量,请使用uint8数组。每个uint8值表示8个维度,值必须介于0和255之间。例如,[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1]是一个16维的二进制向量,应该在JSON文件中写成[128, 7]

基于列的NumPy文件

除了上面提到的基于行的JSON文件之外,您还可以使用NumPy数组将数据集的每个列组织到单独的文件中。在这种情况下,使用每个列的字段名称来命名NumPy文件。

import numpy
numpy.save('book_id.npy', numpy.array([101, 102, 103, 104, 105]))
numpy.save('word_count.npy', numpy.array([13, 25, 7, 12, 34]))
arr = numpy.array([[1.1, 1.2],
            [2.1, 2.2],
            [3.1, 3.2],
            [4.1, 4.2],
            [5.1, 5.2]])
numpy.save('book_intro.npy', arr)
 
  • 使用每个列的字段名称来命名NumPy文件。不要添加命名为目标集合中不存在的字段的文件。每个字段应该有一个NumPy文件。

  • 创建NumPy数组时使用正确的值类型。有关详细信息,请参阅这些示例

从文件中插入实体

1. 上传数据文件

在Milvus中,您可以使用MinIO或本地硬盘进行存储。

Using the local hard disk for storage is only available in Milvus Standalone.

  • 要使用MinIO进行存储,请将数据文件上传到milvus.yml配置文件中定义的存储桶中minio.bucketName

  • 对于本地存储,请将数据文件复制到本地磁盘的目录中。

2.插入实体

为了方便从文件导入数据,Milvus提供了各种口味的批量插入API。在PyMilvus中,您可以使用do_bulk_insert() (opens in a new tab)方法。对于Java SDK,使用bulkInsert (opens in a new tab)方法。

在此方法中,您需要将目标集合的名称设置为collection_name,将在前一步骤中准备的文件列表设置为files。您还可以选择指定目标集合中特定分区的名称作为partition_name,以便Milvus仅将列出的文件数据导入此分区。

  • 对于基于行的JSON文件,参数files应该是仅包含JSON文件路径的单元素列表。

Python Java

from pymilvus import utility
task_id = utility.do_bulk_insert(
    collection_name="book",
    partition_name="2022",
    files=["test.json"]
)
 
import io.milvus.param.bulkinsert.BulkInsertParam;
import io.milvus.response.BulkInsertResponseWrapper;
import io.milvus.grpc.ImportResponse;
import io.milvus.param.R;
 
BulkInsertParam param = BulkInsertParam.newBuilder()
        .withCollectionName("book")
        .withPartitionName("2022")
        .addFile("test.json")
        .build()
R<ImportResponse> response = milvusClient.bulkInsert(param);
BulkInsertResponseWrapper wrapper = new BulkInsertResponseWrapper(response.getData());
task_id = wrapper.getTaskID();
 
  • 对于一组基于列的NumPy文件,参数files应该是包含NumPy文件路径的多元素列表。

Python Java

from pymilvus import utility
task_id = utility.do_bulk_insert(
    collection_name="book",
    partition_name="2022",
    files=["book_id.npy", "word_count.npy", "book_intro.npy"]
)
 
import io.milvus.param.bulkinsert.BulkInsertParam;
import io.milvus.response.BulkInsertResponseWrapper;
import io.milvus.grpc.ImportResponse;
import io.milvus.param.R;
 
BulkInsertParam param = BulkInsertParam.newBuilder()
        .withCollectionName("book")
        .withPartitionName("2022")
        .addFile("book_id.npy")
        .addFile("word_count.npy")
        .addFile("book_intro.npy")
        .build()
R<ImportResponse> response = milvusClient.bulkInsert(param);
BulkInsertResponseWrapper wrapper = new BulkInsertResponseWrapper(response.getData());
task_id = wrapper.getTaskID();
 

每次批量插入API调用都会立即返回。返回值是在后台运行的数据导入任务的ID。Milvus维护这样的任务队列以并行分派到空闲数据节点。

设置文件路径时,请注意

  • 如果您将数据文件上传到MinIO实例,则有效的文件路径应相对于在**"milvus.yml"中定义的根桶,例如"data/book_id.npy"**。

  • 如果您将数据文件上传到本地硬盘,则有效的文件路径应为绝对路径,例如**"/tmp/data/book_id.npy"**。 如果您有很多文件需要处理,请考虑创建多个数据导入任务并让它们并行运行

列出任务

检查任务状态

由于批量插入API是异步的,您可能需要检查数据导入任务是否已完成。Milvus提供了一个BulkInsertState对象来保存数据导入任务的详细信息,您可以使用编程语言提供的get-bulk-insert-state API来检索此对象。

在PyMilvus中,您可以使用get_bulk_insert_state() (opens in a new tab)。对于Java SDK,请使用getBulkInsertState() (opens in a new tab)

Python Java

task = utility.get_bulk_insert_state(task_id=task_id)
print("Task state:", task.state_name)
print("Imported files:", task.files)
print("Collection name:", task.collection_name)
print("Partition name:", task.partition_name)
print("Start time:", task.create_time_str)
print("Imported row count:", task.row_count)
print("Entities ID array generated by this task:", task.ids)
 
if task.state == BulkInsertState.ImportFailed:
    print("Failed reason:", task.failed_reason)
 
import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
import io.milvus.response.GetBulkInsertStateWrapper;
import io.milvus.grpc.GetImportStateResponse;
import io.milvus.grpc.ImportState;
import io.milvus.param.R;
 
GetBulkInsertStateParam param = GetBulkInsertStateParam.newBuilder()
        .withTask(task_id)
        .build()
R<GetImportStateResponse> response = milvusClient.getBulkInsertState(param);
GetBulkInsertStateWrapper wrapper = new GetBulkInsertStateWrapper(response.getData());
ImportState state = wrapper.getState();
long row_count = wrapper.getImportedCount();
String create_ts = wrapper.getCreateTimeStr();
String failed_reason = wrapper.getFailedReason();
String files = wrapper.getFiles();
int progress = wrapper.getProgress();
 
状态代码描述
等待执行0任务等待执行。
执行失败1任务执行失败。使用task.failed_reason了解任务失败的原因。
已开始执行2任务已被调度到数据节点,即将执行。
持久化5已生成并持久化了新数据分段。
已完成6新分段的元数据已更新。
执行失败并已清除7任务执行失败,该任务产生的所有临时数据已被清除。

列出所有任务

Milvus还提供了list-bulk-insert-tasks API,允许您列出所有数据导入任务。在此方法中,您需要指定集合名称,以便Milvus列出导入数据到此集合的所有任务。可选地,您可以指定返回任务的最大数量的限制。

Python Java

tasks = utility.list_bulk_insert_tasks(collection_name="book", limit=10)
for task in tasks:
    print(task)
 
import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
import io.milvus.grpc. ListImportTasksResponse;
import io.milvus.grpc.GetImportStateResponse;
import io.milvus.grpc.ImportState;
import io.milvus.param.R;
 
ListBulkInsertTasksParam param = ListBulkInsertTasksParam.newBuilder()
    .withCollectionName("book")
    .build()
R<ListImportTasksResponse> response = milvusClient.listBulkInsertTasks(param);
List<GetImportStateResponse> tasks = response.getTasksList();
for (GetImportStateResponse task : tasks) {
    GetBulkInsertStateWrapper wrapper = new GetBulkInsertStateWrapper(task);
    ImportState state = wrapper.getState();
    long row_count = wrapper.getImportedCount();
    String create_ts = wrapper.getCreateTimeStr();
    String failed_reason = wrapper.getFailedReason();
    String files = wrapper.getFiles();
}
 
参数描述
collection_name (可选)指定目标集合名称以列出此集合上的所有任务。如果要列出Milvus根坐标记录的所有任务,请将该值留空。
limit (可选)指定此参数以限制返回的任务数。

有关导入任务配置的更多信息,请参见系统配置

限制

FeatureMaximum limit
Max. 任务挂起列表的最大值65536
Max. 数据文件最大值16 GB

参考资料

配置Milvus数据导入

为了让Milvus自动删除失败或旧的数据导入任务,您可以在Milvus配置文件中指定数据导入任务的超时持续时间和保留期。

rootCoord:
  # (in seconds) Duration after which an import task will expire (be killed). Default 900 seconds (15 minutes).
  # Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
  importTaskExpiration: 900
  # (in seconds) Milvus will keep the record of import tasks for at least `importTaskRetention` seconds. Default 86400
  # seconds (24 hours).
  # Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
  importTaskRetention: 86400
 

创建NumPy文件

以下示例演示如何为Milvus支持的数据类型的列创建NumPy文件。

  • 从布尔数组创建Numpy文件
import numpy as np
data = [True, False, True, False]
dt = np.dtype('bool', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从int8数组创建NumPy文件
import numpy as np
data = [1, 2, 3, 4]
dt = np.dtype('int8', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从int16数组创建NumPy文件
import numpy as np
data = [1, 2, 3, 4]
dt = np.dtype('int16', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从int32数组创建NumPy文件
import numpy as np
data = [1, 2, 3, 4]
dt = np.dtype('int32', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从int64数组创建NumPy文件
import numpy as np
data = [1, 2, 3, 4]
dt = np.dtype('int64', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从浮点数数组创建NumPy文件
import numpy as np
data = [0.1, 0.2, 0.3, 0.4]
dt = np.dtype('float32', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从双精度浮点数数组创建NumPy文件
import numpy as np
data = [0.1, 0.2, 0.3, 0.4]
dt = np.dtype('float64', (len(data)))
arr = np.array(data, dtype=dt)
np.save(file_path, arr)
 
  • 从VARCHAR数组创建NumPy文件
data = ["a", "b", "c", "d"]
arr = np.array(data)
np.save(file_path, arr)
 
  • 从二进制向量数组创建NumPy文件

对于二进制向量,使用uint8作为NumPy数据类型。每个uint8值表示8个维度。对于32维二进制向量,使用四个uint8值。

data = [
    [43, 35, 124, 90],
    [65, 212, 12, 57],
    [6, 126, 232, 78],
    [87, 189, 38, 22],
]
dt = np.dtype('uint8', (len(data), 4))
arr = np.array(data)
np.save(file_path, arr)
 
  • 从浮点向量数组创建NumPy文件

在Milvus中,您可以使用float32或float64值来构成浮点向量。

下面的代码片段创建了一个NumPy文件,其中包含使用float32值形成的8维向量数组。

data = [
    [1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8],
    [2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8],
    [3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8],
    [4.1, 4.2, 4.3, 4.4, 4.5, 4.6, 4.7, 4.8],
]
dt = np.dtype('float32', (len(data), 8))
arr = np.array(data)
np.save(file_path, arr)
 

Import multiple NumPy files in parallel

您可以将NumPy文件上传到不同的子目录中,创建多个导入任务,并并行执行它们。

假设数据结构如下:

 
├── task_1
    └── book_id.npy
    └── word_count.npy
    └── book_intro.npy
├── task_2
    └── book_id.npy
    └── word_count.npy
    └── book_intro.npy
 

您可以按如下方式创建多个数据导入任务

task_1 = utility.do_bulk_insert(
    collection_name="book",
    files=["task_1/book_id.npy", "task_1/word_count.npy", "task_1/book_intro.npy"]
)
task_2 = utility.do_bulk_insert(
    collection_name="book",
    files=["task_2/book_id.npy", "task_2/word_count.npy", "task_2/book_intro.npy"]
)
 

检查数据可搜索性

数据导入任务完成后,Milvus将导入的数据保存到段中,并将这些段发送到索引节点进行索引构建。在索引构建过程中,这些段不可用于搜索。一旦完成此类过程,您需要再次调用load API将这些段加载到查询节点中。这些段将准备好进行搜索。

  • 检查索引构建进度

PyMilvus提供了一种实用方法,等待索引构建过程完成。

utility.wait_for_index_building_complete(collection_name)
 

在其他SDK中,您可以使用describe-index API检查索引构建进度。

while (true) {
    R<DescribeIndexResponse> response = milvusClient.describeIndex(
        DescribeIndexParam.newBuilder()
            .withCollectionName(collection_name)
            .withIndexName(index_name)
            .build());
    IndexDescription desc = response.getData().getIndexDescriptions(0);
    if (desc.getIndexedRows() == desc.getTotalRows()) {
        break;
    }
}
 
  • 将新段加载到查询节点中

需要手动加载新索引段,如下所示:

Python Java

collection.load(_refresh = True)
 
R<RpcStatus> response = milvusClient.loadCollection(
    LoadCollectionParam.newBuilder()
        .withCollectionName(collection_name)
        .withRefresh(Boolean.TRUE)
        .build());
 

默认情况下,_refresh参数为false。在首次加载集合时不要将其设置为true

withRefresh()方法是可选的。在首次加载集合时不要使用Boolean.TRUE进行调用。

下一步操作

学习更多Milvus的基本操作: