Skip to content

Commit 59076e2

Browse files
authored
Merge pull request #139 from zilliztech/0415_fix_timestamp_tz_error
0415 fix timestamp tz error
2 parents 02fd2df + e86e8a9 commit 59076e2

File tree

6 files changed

+389
-15
lines changed

6 files changed

+389
-15
lines changed

docs/en/connector-v2/sink/Milvus.md

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the foll
3636
| FLOAT16_VECTOR | FLOAT16_VECTOR |
3737
| BFLOAT16_VECTOR | BFLOAT16_VECTOR |
3838
| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
39+
| TIMESTAMPTZ | TIMESTAMP_TZ |
40+
| GEOMETRY | GEOMETRY |
3941

4042
## Sink Options
4143

@@ -49,10 +51,51 @@ This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the foll
4951
| enable_upsert | boolean | No | false | Upsert data not insert. |
5052
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
5153
| batch_size | int | No | 1000 | Write batch size. |
52-
| partition_key | String | No | | Milvus partition key field |
54+
| partition_key | String | No | | Milvus partition key field |
55+
| collection_rename | Map | No | {} | Rename collections: `{source_name = "target_name"}` |
56+
| field_schema | List | No | [] | Per-field schema configuration. See below. |
57+
58+
## Field Schema
59+
60+
When `field_schema` is supplied, only the fields defined in it will be written. If empty, the full source schema is used.
61+
62+
Each field object supports:
63+
64+
| Property | Type | Required | Description |
65+
|--------------------|---------|----------|-----------------------------------------------------------------------------|
66+
| field_name | String | Yes* | Target field name in Milvus collection. |
67+
| source_field_name | String | Yes* | Source field name. If both are provided, `field_name` is the target name. |
68+
| data_type | Integer | Yes | Milvus data type code (e.g. Int64=5, VarChar=21, FloatVector=101, Timestamptz=26). |
69+
| is_primary_key | Boolean | No | Mark as primary key. |
70+
| auto_id | Boolean | No | Enable auto ID for primary key. |
71+
| dimension | Integer | No | Required for vector types. |
72+
| max_length | Integer | No | Max length for VarChar fields. |
73+
| element_type | Integer | No | Element type for Array fields. |
74+
| max_capacity | Integer | No | Max capacity for Array fields. Default: 4096. |
75+
| is_nullable | Boolean | No | Whether the field is nullable. |
76+
| is_partition_key | Boolean | No | Mark as partition key. |
77+
| timezone | String | No | IANA timezone ID (e.g. `Asia/Shanghai`, `US/Eastern`) or UTC offset (e.g. `+08:00`) for interpreting tz-naive source timestamps when writing to Milvus Timestamptz fields. If not set, falls back to JVM default timezone. See usage guidance below. |
78+
79+
\* At least one of `field_name` or `source_field_name` is required.
80+
81+
### When to use the `timezone` property
82+
83+
The `timezone` property is only needed when the **source value does not carry timezone information**. If the source value already has a timezone, do not configure it — the existing conversion handles it correctly.
84+
85+
| Source type | Example | Has timezone? | Configure `timezone`? |
86+
|---|---|---|---|
87+
| PostgreSQL `timestamp` (without tz) | `2024-01-02 08:00:00` | No | **Yes** — specify the intended timezone |
88+
| PostgreSQL `timestamptz` | `2024-01-02 08:00:00+08` | Yes | **No** — already carries offset |
89+
| MySQL `datetime` | `2024-01-02 08:00:00` | No | **Yes** |
90+
| ES `date` (epoch_millis or with offset) | `1704153600000` | Yes | **No** — internally UTC |
91+
| ES `date` (custom format without offset) | `2024-01-02 08:00:00` | No | **Yes**, only if ALL values in this field lack offset |
92+
93+
**Warning:** If a source field contains a mix of values with and without timezone information (e.g. Elasticsearch `date` with multiple formats), do not configure `timezone`. The existing systemDefault-based conversion handles the timezone-aware values correctly; adding a `timezone` override would cause double-conversion for those values.
5394

5495
## Task Example
5596

97+
### Basic
98+
5699
```bash
57100
sink {
58101
Milvus {
@@ -63,6 +106,25 @@ sink {
63106
}
64107
```
65108

109+
### With field_schema and per-field timezone
110+
111+
```bash
112+
sink {
113+
Milvus {
114+
url = "http://127.0.0.1:19530"
115+
token = "username:password"
116+
database = "default"
117+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
118+
field_schema = [
119+
{field_name = "id", data_type = 5, is_primary_key = true}
120+
{field_name = "title", data_type = 21, max_length = 512}
121+
{field_name = "created_at", data_type = 26, is_nullable = true, timezone = "Asia/Shanghai"}
122+
{field_name = "embedding", data_type = 101, dimension = 768}
123+
]
124+
}
125+
}
126+
```
127+
66128
## Changelog
67129

68130
<ChangeLog />

docs/zh/connector-v2/sink/Milvus.md

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
3636
| FLOAT16_VECTOR | FLOAT16_VECTOR |
3737
| BFLOAT16_VECTOR | BFLOAT16_VECTOR |
3838
| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
39+
| TIMESTAMPTZ | TIMESTAMP_TZ |
40+
| GEOMETRY | GEOMETRY |
3941

4042
## Sink 选项
4143

@@ -49,10 +51,51 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
4951
| enable_upsert | boolean || false | 是否启用upsert。 |
5052
| enable_dynamic_field | boolean || true | 是否启用带动态字段的创建表。 |
5153
| batch_size | int || 1000 | 写入批大小。 |
52-
| partition_key | String || | Milvus分区键字段 |
54+
| partition_key | String || | Milvus分区键字段 |
55+
| collection_rename | Map || {} | 重命名集合:`{源名称 = "目标名称"}` |
56+
| field_schema | List || [] | 按字段配置 schema。详见下文。 |
57+
58+
## 字段 Schema 配置
59+
60+
配置 `field_schema` 后,仅写入其中定义的字段。留空则使用完整的源端 schema。
61+
62+
每个字段对象支持以下属性:
63+
64+
| 属性 | 类型 | 是否必传 | 描述 |
65+
|--------------------|---------|----------|----------------------------------------------------------------------|
66+
| field_name | String |* | Milvus 目标字段名。 |
67+
| source_field_name | String |* | 源端字段名。两者都提供时,`field_name` 作为目标名。 |
68+
| data_type | Integer || Milvus 数据类型编码(如 Int64=5, VarChar=21, FloatVector=101, Timestamptz=26)。 |
69+
| is_primary_key | Boolean || 标记为主键。 |
70+
| auto_id | Boolean || 主键启用自动 ID。 |
71+
| dimension | Integer || 向量类型必填。 |
72+
| max_length | Integer || VarChar 字段的最大长度。 |
73+
| element_type | Integer || Array 字段的元素类型。 |
74+
| max_capacity | Integer || Array 字段的最大容量。默认:4096。 |
75+
| is_nullable | Boolean || 字段是否可为空。 |
76+
| is_partition_key | Boolean || 标记为分区键。 |
77+
| timezone | String || IANA 时区 ID(如 `Asia/Shanghai``US/Eastern`)或 UTC 偏移(如 `+08:00`),用于在写入 Milvus Timestamptz 字段时解释不带时区的源端时间戳。未设置时回退到 JVM 默认时区。详见下方使用说明。 |
78+
79+
\* `field_name``source_field_name` 至少需提供一个。
80+
81+
### `timezone` 属性使用说明
82+
83+
`timezone` 仅在 **源端值本身不携带时区信息** 时需要配置。如果源端值已经带有时区,不要配置——现有的转换逻辑会正确处理。
84+
85+
| 源端类型 | 示例 | 是否带时区 | 是否配置 `timezone` |
86+
|---|---|---|---|
87+
| PostgreSQL `timestamp`(无时区) | `2024-01-02 08:00:00` || **需要** — 指定数据的实际时区 |
88+
| PostgreSQL `timestamptz` | `2024-01-02 08:00:00+08` || **不需要** — 已携带偏移量 |
89+
| MySQL `datetime` | `2024-01-02 08:00:00` || **需要** |
90+
| ES `date`(epoch_millis 或带偏移) | `1704153600000` || **不需要** — 内部为 UTC |
91+
| ES `date`(不带偏移的自定义格式) | `2024-01-02 08:00:00` || **需要**,仅当该字段所有值都不带偏移时 |
92+
93+
**注意:** 如果源端字段中混合存在带时区和不带时区的值(如 Elasticsearch `date` 字段使用多种格式),不要配置 `timezone`。现有的 systemDefault 转换机制会正确处理带时区的值;额外配置 `timezone` 会导致这些值被双重转换。
5394

5495
## 任务示例
5596

97+
### 基础用法
98+
5699
```bash
57100
sink {
58101
Milvus {
@@ -63,6 +106,25 @@ sink {
63106
}
64107
```
65108

109+
### 使用 field_schema 和按字段时区配置
110+
111+
```bash
112+
sink {
113+
Milvus {
114+
url = "http://127.0.0.1:19530"
115+
token = "username:password"
116+
database = "default"
117+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
118+
field_schema = [
119+
{field_name = "id", data_type = 5, is_primary_key = true}
120+
{field_name = "title", data_type = 21, max_length = 512}
121+
{field_name = "created_at", data_type = 26, is_nullable = true, timezone = "Asia/Shanghai"}
122+
{field_name = "embedding", data_type = 101, dimension = 768}
123+
]
124+
}
125+
}
126+
```
127+
66128
## 变更日志
67129

68130
<ChangeLog />

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
1919

2020
import com.google.auto.service.AutoService;
21+
import com.google.gson.Gson;
22+
import com.google.gson.reflect.TypeToken;
2123
import org.apache.commons.lang3.StringUtils;
2224
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2325
import org.apache.seatunnel.api.configuration.util.OptionRule;
@@ -27,8 +29,12 @@
2729
import org.apache.seatunnel.api.table.factory.Factory;
2830
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2931
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
32+
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.catalog.MilvusFieldSchema;
3033
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.config.MilvusSinkConfig;
3134

35+
import java.time.ZoneId;
36+
import java.util.List;
37+
3238
@AutoService(Factory.class)
3339
public class MilvusSinkFactory implements TableSinkFactory {
3440

@@ -51,10 +57,35 @@ public OptionRule optionRule() {
5157

5258
public TableSink createSink(TableSinkFactoryContext context) {
5359
ReadonlyConfig config = context.getOptions();
60+
validateFieldTimezones(config);
5461
CatalogTable catalogTable = renameCatalogTable(config, context.getCatalogTable());
5562
return () -> new MilvusSink(config, catalogTable);
5663
}
5764

65+
private void validateFieldTimezones(ReadonlyConfig config) {
66+
List<Object> rawFieldSchema = config.get(MilvusSinkConfig.FIELD_SCHEMA);
67+
if (rawFieldSchema == null || rawFieldSchema.isEmpty()) {
68+
return;
69+
}
70+
Gson gson = new Gson();
71+
List<MilvusFieldSchema> fieldSchemaList = gson.fromJson(
72+
gson.toJson(rawFieldSchema),
73+
new TypeToken<List<MilvusFieldSchema>>() {}.getType());
74+
for (MilvusFieldSchema fs : fieldSchemaList) {
75+
if (fs.getTimezone() != null && !fs.getTimezone().isEmpty()) {
76+
try {
77+
ZoneId.of(fs.getTimezone());
78+
} catch (Exception e) {
79+
throw new IllegalArgumentException(
80+
"Invalid timezone '" + fs.getTimezone()
81+
+ "' for field '" + fs.getEffectiveFieldName()
82+
+ "'. Use IANA zone ID (e.g. 'Asia/Shanghai') "
83+
+ "or UTC offset (e.g. '+08:00').", e);
84+
}
85+
}
86+
}
87+
}
88+
5889
private CatalogTable renameCatalogTable(
5990
ReadonlyConfig config, CatalogTable sourceCatalogTable) {
6091
TableIdentifier sourceTableId = sourceCatalogTable.getTableId();

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/catalog/MilvusFieldSchema.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class MilvusFieldSchema {
6464
@SerializedName("enable_match")
6565
private Boolean enableMatch;
6666

67+
@SerializedName("timezone")
68+
private String timezone;
69+
6770
/**
6871
* Get the effective field name (field_name if specified, otherwise source_field_name)
6972
*/

0 commit comments

Comments
 (0)