Skip to content

Commit 330c0e6

Browse files
alohaha22claude
andcommitted
[Feature][Connector-Milvus] support per-field timezone for naive timestamp to Timestamptz conversion
When source columns are tz-naive (e.g. PostgreSQL 'timestamp' without timezone, MySQL 'datetime'), users can now specify a 'timezone' property per field in field_schema to control how the wall-clock value is interpreted when writing to Milvus Timestamptz fields. Accepts IANA zone IDs (e.g. 'Asia/Shanghai') and UTC offsets (e.g. '+08:00'). Without the override, falls back to JVM systemDefault for backward compatibility. OffsetDateTime and java.sql.Timestamp values are not affected — they already carry absolute instants. String values use the configured timezone when set, or the original behavior when not set. Timezone values are validated at job submission time in MilvusSinkFactory to fail fast on invalid zone IDs. Config example: field_schema = [ {field_name = "created_at", data_type = 26, timezone = "Asia/Shanghai"} ] Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 02fd2df commit 330c0e6

File tree

4 files changed

+263
-13
lines changed

4 files changed

+263
-13
lines changed

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
1919

2020
import com.google.auto.service.AutoService;
21+
import com.google.gson.Gson;
22+
import com.google.gson.reflect.TypeToken;
2123
import org.apache.commons.lang3.StringUtils;
2224
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2325
import org.apache.seatunnel.api.configuration.util.OptionRule;
@@ -27,8 +29,12 @@
2729
import org.apache.seatunnel.api.table.factory.Factory;
2830
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2931
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
32+
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.catalog.MilvusFieldSchema;
3033
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.config.MilvusSinkConfig;
3134

35+
import java.time.ZoneId;
36+
import java.util.List;
37+
3238
@AutoService(Factory.class)
3339
public class MilvusSinkFactory implements TableSinkFactory {
3440

@@ -51,10 +57,35 @@ public OptionRule optionRule() {
5157

5258
public TableSink createSink(TableSinkFactoryContext context) {
5359
ReadonlyConfig config = context.getOptions();
60+
validateFieldTimezones(config);
5461
CatalogTable catalogTable = renameCatalogTable(config, context.getCatalogTable());
5562
return () -> new MilvusSink(config, catalogTable);
5663
}
5764

65+
private void validateFieldTimezones(ReadonlyConfig config) {
66+
List<Object> rawFieldSchema = config.get(MilvusSinkConfig.FIELD_SCHEMA);
67+
if (rawFieldSchema == null || rawFieldSchema.isEmpty()) {
68+
return;
69+
}
70+
Gson gson = new Gson();
71+
List<MilvusFieldSchema> fieldSchemaList = gson.fromJson(
72+
gson.toJson(rawFieldSchema),
73+
new TypeToken<List<MilvusFieldSchema>>() {}.getType());
74+
for (MilvusFieldSchema fs : fieldSchemaList) {
75+
if (fs.getTimezone() != null && !fs.getTimezone().isEmpty()) {
76+
try {
77+
ZoneId.of(fs.getTimezone());
78+
} catch (Exception e) {
79+
throw new IllegalArgumentException(
80+
"Invalid timezone '" + fs.getTimezone()
81+
+ "' for field '" + fs.getEffectiveFieldName()
82+
+ "'. Use IANA zone ID (e.g. 'Asia/Shanghai') "
83+
+ "or UTC offset (e.g. '+08:00').", e);
84+
}
85+
}
86+
}
87+
}
88+
5889
private CatalogTable renameCatalogTable(
5990
ReadonlyConfig config, CatalogTable sourceCatalogTable) {
6091
TableIdentifier sourceTableId = sourceCatalogTable.getTableId();

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/catalog/MilvusFieldSchema.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class MilvusFieldSchema {
6464
@SerializedName("enable_match")
6565
private Boolean enableMatch;
6666

67+
@SerializedName("timezone")
68+
private String timezone;
69+
6770
/**
6871
* Get the effective field name (field_name if specified, otherwise source_field_name)
6972
*/

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/utils/MilvusSinkConverter.java

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
4444
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
4545
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.catalog.MilvusFieldSchema;
46+
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.config.MilvusSinkConfig;
47+
48+
import lombok.extern.slf4j.Slf4j;
4649

4750
import java.nio.ByteBuffer;
4851
import java.time.LocalDateTime;
@@ -51,11 +54,13 @@
5154
import java.util.ArrayList;
5255
import java.util.Arrays;
5356
import java.util.Collections;
57+
import java.util.HashMap;
5458
import java.util.List;
5559
import java.util.Map;
5660
import java.util.Objects;
5761
import java.util.stream.Collectors;
5862

63+
@Slf4j
5964
public class MilvusSinkConverter {
6065
private static final Gson gson = new Gson();
6166

@@ -68,16 +73,19 @@ public class MilvusSinkConverter {
6873
* file; it lives entirely on {@code GeometryConverter}.
6974
*/
7075
private final GeometryConverter geometryConverter;
76+
private final Map<String, ZoneId> fieldTimezoneOverrides;
7177

7278
public MilvusSinkConverter() {
73-
// Matches the production default (GEOMETRY_CONVERT_MODE = "passthrough"):
74-
// Geometry strings flow byte-for-byte to Milvus. Tests that need the
75-
// parsing path should construct with new MilvusSinkConverter(GeometryConverter.PARSE).
76-
this(GeometryConverter.PASSTHROUGH);
79+
this(GeometryConverter.PASSTHROUGH, Collections.emptyMap());
7780
}
7881

7982
public MilvusSinkConverter(GeometryConverter geometryConverter) {
83+
this(geometryConverter, Collections.emptyMap());
84+
}
85+
86+
public MilvusSinkConverter(GeometryConverter geometryConverter, Map<String, ZoneId> fieldTimezoneOverrides) {
8087
this.geometryConverter = geometryConverter;
88+
this.fieldTimezoneOverrides = fieldTimezoneOverrides;
8189
}
8290

8391
/**
@@ -87,7 +95,31 @@ public MilvusSinkConverter(GeometryConverter geometryConverter) {
8795
* they need without polluting the writer with sub-domain knowledge.
8896
*/
8997
public static MilvusSinkConverter fromConfig(ReadonlyConfig config) {
90-
return new MilvusSinkConverter(GeometryConverter.fromConfig(config));
98+
GeometryConverter geo = GeometryConverter.fromConfig(config);
99+
Map<String, ZoneId> tzOverrides = parseFieldTimezones(config);
100+
return new MilvusSinkConverter(geo, tzOverrides);
101+
}
102+
103+
private static Map<String, ZoneId> parseFieldTimezones(ReadonlyConfig config) {
104+
List<Object> rawFieldSchema = config.get(MilvusSinkConfig.FIELD_SCHEMA);
105+
if (rawFieldSchema == null || rawFieldSchema.isEmpty()) {
106+
return Collections.emptyMap();
107+
}
108+
List<MilvusFieldSchema> fieldSchemaList = gson.fromJson(
109+
gson.toJson(rawFieldSchema),
110+
new TypeToken<List<MilvusFieldSchema>>() {}.getType());
111+
Map<String, ZoneId> result = new HashMap<>();
112+
for (MilvusFieldSchema fs : fieldSchemaList) {
113+
if (fs.getTimezone() != null && !fs.getTimezone().isEmpty()) {
114+
String name = fs.getEffectiveFieldName();
115+
if (name != null) {
116+
ZoneId zone = ZoneId.of(fs.getTimezone());
117+
result.put(name, zone);
118+
log.info("Milvus field '{}' naive-to-aware timezone: {}", name, zone);
119+
}
120+
}
121+
}
122+
return result;
91123
}
92124

93125
public Object convertBySeaTunnelType(
@@ -485,24 +517,39 @@ public Object convertByMilvusType(FieldSchema fieldSchema, Object value) {
485517
case Timestamptz:
486518
// Milvus SDK requires Timestamptz as ISO 8601 String format (e.g., "2024-01-19T11:30:45Z")
487519
// Reference: Milvus ParamUtils.java line 430+ requires String type for Timestamptz
520+
ZoneId tzOverride = fieldTimezoneOverrides.get(fieldSchema.getName());
488521
if (value instanceof java.sql.Timestamp) {
489-
// Convert java.sql.Timestamp to ISO 8601 string (UTC)
522+
// java.sql.Timestamp internally holds epoch millis (an absolute instant).
523+
// toInstant() is always correct — per-field timezone does not apply.
490524
return ((java.sql.Timestamp) value).toInstant().toString();
491525
} else if (value instanceof LocalDateTime) {
492-
// Convert LocalDateTime (systemDefault) to ISO 8601 string
493-
return ((LocalDateTime) value).atZone(ZoneId.systemDefault()).toInstant().toString();
526+
ZoneId zone = tzOverride != null ? tzOverride : ZoneId.systemDefault();
527+
return ((LocalDateTime) value).atZone(zone).toInstant().toString();
494528
} else if (value instanceof OffsetDateTime) {
495-
// Convert OffsetDateTime to ISO 8601 string (preserves timezone)
529+
// OffsetDateTime already carries offset — toInstant() is always correct.
530+
// Per-field timezone does not apply.
496531
return ((OffsetDateTime) value).toInstant().toString();
497532
} else if (value instanceof String) {
498-
// Already a string - validate and/or convert to ISO 8601 format
499-
String strValue = value.toString();
500-
// Try to parse and normalize to ISO 8601
533+
String strValue = value.toString().trim();
534+
if (tzOverride != null) {
535+
// User explicitly configured timezone — parse as naive wall-clock
536+
// and apply the configured zone, consistent with LocalDateTime path.
537+
// Timestamp.valueOf only accepts "yyyy-MM-dd HH:mm:ss[.fff]" format,
538+
// so strings with timezone info (Z, +08:00, etc.) naturally fail
539+
// and pass through as-is.
540+
try {
541+
LocalDateTime ldt = java.sql.Timestamp.valueOf(
542+
strValue.replace('T', ' ')).toLocalDateTime();
543+
return ldt.atZone(tzOverride).toInstant().toString();
544+
} catch (IllegalArgumentException e) {
545+
return strValue;
546+
}
547+
}
548+
// No timezone override — original behavior
501549
try {
502550
java.sql.Timestamp ts = java.sql.Timestamp.valueOf(strValue);
503551
return ts.toInstant().toString();
504552
} catch (IllegalArgumentException e) {
505-
// If it's already in ISO 8601 format, return as-is
506553
return strValue;
507554
}
508555
}

seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/utils/MilvusSinkConverterTest.java

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@
2626
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2727
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2828
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2930
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
3031
import org.junit.jupiter.api.Assertions;
3132
import org.junit.jupiter.api.Test;
3233

3334
import java.time.LocalDateTime;
3435
import java.time.OffsetDateTime;
36+
import java.time.ZoneId;
3537
import java.time.ZoneOffset;
3638
import java.util.HashMap;
3739
import java.util.List;
40+
import java.util.Map;
3841

3942
public class MilvusSinkConverterTest {
4043

@@ -134,6 +137,172 @@ public void testConvertByMilvusType_Timestamptz_InvalidStringFallback() {
134137
Assertions.assertEquals("not-a-date", result);
135138
}
136139

140+
// --- Per-field timezone handling for Timestamptz ---
141+
142+
@Test
143+
public void testConvertByMilvusType_Timestamptz_LocalDateTime_PerFieldShanghai() {
144+
Map<String, ZoneId> overrides = new HashMap<>();
145+
overrides.put("ts", ZoneId.of("Asia/Shanghai"));
146+
MilvusSinkConverter shanghaiConverter = new MilvusSinkConverter(
147+
GeometryConverter.PASSTHROUGH, overrides);
148+
FieldSchema schema = FieldSchema.builder()
149+
.name("ts").dataType(DataType.Timestamptz).build();
150+
LocalDateTime ldt = LocalDateTime.of(2024, 1, 19, 10, 0, 0);
151+
Object result = shanghaiConverter.convertByMilvusType(schema, ldt);
152+
// 10:00 Shanghai = 02:00 UTC
153+
Assertions.assertEquals("2024-01-19T02:00:00Z", result);
154+
}
155+
156+
@Test
157+
public void testConvertByMilvusType_Timestamptz_LocalDateTime_PerFieldUtc() {
158+
Map<String, ZoneId> overrides = new HashMap<>();
159+
overrides.put("ts", ZoneId.of("UTC"));
160+
MilvusSinkConverter utcConverter = new MilvusSinkConverter(
161+
GeometryConverter.PASSTHROUGH, overrides);
162+
FieldSchema schema = FieldSchema.builder()
163+
.name("ts").dataType(DataType.Timestamptz).build();
164+
LocalDateTime ldt = LocalDateTime.of(2024, 1, 19, 10, 0, 0);
165+
Object result = utcConverter.convertByMilvusType(schema, ldt);
166+
Assertions.assertEquals("2024-01-19T10:00:00Z", result);
167+
}
168+
169+
@Test
170+
public void testConvertByMilvusType_Timestamptz_LocalDateTime_PerFieldUtcOffset() {
171+
// UTC offset format "+08:00" should work the same as IANA "Asia/Shanghai"
172+
Map<String, ZoneId> overrides = new HashMap<>();
173+
overrides.put("ts", ZoneId.of("+08:00"));
174+
MilvusSinkConverter offsetConverter = new MilvusSinkConverter(
175+
GeometryConverter.PASSTHROUGH, overrides);
176+
FieldSchema schema = FieldSchema.builder()
177+
.name("ts").dataType(DataType.Timestamptz).build();
178+
LocalDateTime ldt = LocalDateTime.of(2024, 1, 19, 10, 0, 0);
179+
Object result = offsetConverter.convertByMilvusType(schema, ldt);
180+
// 10:00 +08:00 = 02:00 UTC
181+
Assertions.assertEquals("2024-01-19T02:00:00Z", result);
182+
}
183+
184+
@Test
185+
public void testConvertByMilvusType_Timestamptz_LocalDateTime_NoOverrideFallsBackToSystemDefault() {
186+
// No per-field override → systemDefault
187+
FieldSchema schema = FieldSchema.builder()
188+
.name("ts").dataType(DataType.Timestamptz).build();
189+
LocalDateTime ldt = LocalDateTime.of(2024, 1, 19, 10, 0, 0);
190+
Object result = converter.convertByMilvusType(schema, ldt);
191+
String expected = ldt.atZone(ZoneId.systemDefault()).toInstant().toString();
192+
Assertions.assertEquals(expected, result);
193+
}
194+
195+
@Test
196+
public void testConvertByMilvusType_Timestamptz_MultipleFieldsDifferentZones() {
197+
Map<String, ZoneId> overrides = new HashMap<>();
198+
overrides.put("created_at", ZoneId.of("Asia/Shanghai"));
199+
overrides.put("updated_at", ZoneId.of("US/Eastern"));
200+
MilvusSinkConverter multiConverter = new MilvusSinkConverter(
201+
GeometryConverter.PASSTHROUGH, overrides);
202+
203+
LocalDateTime ldt = LocalDateTime.of(2024, 6, 15, 12, 0, 0);
204+
205+
FieldSchema createdSchema = FieldSchema.builder()
206+
.name("created_at").dataType(DataType.Timestamptz).build();
207+
Object createdResult = multiConverter.convertByMilvusType(createdSchema, ldt);
208+
// 12:00 Shanghai (+8) = 04:00 UTC
209+
Assertions.assertEquals("2024-06-15T04:00:00Z", createdResult);
210+
211+
FieldSchema updatedSchema = FieldSchema.builder()
212+
.name("updated_at").dataType(DataType.Timestamptz).build();
213+
Object updatedResult = multiConverter.convertByMilvusType(updatedSchema, ldt);
214+
// 12:00 US/Eastern (EDT = -4 in June) = 16:00 UTC
215+
Assertions.assertEquals("2024-06-15T16:00:00Z", updatedResult);
216+
}
217+
218+
@Test
219+
public void testFromConfig_FieldSchemaTimezone_WiredToConverter() {
220+
// Simulate field_schema config with timezone
221+
Map<String, Object> fieldEntry = new HashMap<>();
222+
fieldEntry.put("field_name", "event_time");
223+
fieldEntry.put("data_type", 26); // Timestamptz
224+
fieldEntry.put("timezone", "Asia/Shanghai");
225+
226+
List<Object> fieldSchemaList = new java.util.ArrayList<>();
227+
fieldSchemaList.add(fieldEntry);
228+
229+
Map<String, Object> configMap = new HashMap<>();
230+
configMap.put("field_schema", fieldSchemaList);
231+
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
232+
MilvusSinkConverter shanghaiConverter = MilvusSinkConverter.fromConfig(config);
233+
234+
FieldSchema schema = FieldSchema.builder()
235+
.name("event_time").dataType(DataType.Timestamptz).build();
236+
LocalDateTime ldt = LocalDateTime.of(2024, 1, 19, 10, 0, 0);
237+
Object result = shanghaiConverter.convertByMilvusType(schema, ldt);
238+
// 10:00 Shanghai = 02:00 UTC
239+
Assertions.assertEquals("2024-01-19T02:00:00Z", result);
240+
}
241+
242+
@Test
243+
public void testConvertByMilvusType_Timestamptz_String_WithOverrideUsesConfiguredTz() {
244+
// String input + per-field timezone → parse as naive wall-clock, apply configured tz
245+
Map<String, ZoneId> overrides = new HashMap<>();
246+
overrides.put("ts", ZoneId.of("Asia/Shanghai"));
247+
MilvusSinkConverter shanghaiConverter = new MilvusSinkConverter(
248+
GeometryConverter.PASSTHROUGH, overrides);
249+
FieldSchema schema = FieldSchema.builder()
250+
.name("ts").dataType(DataType.Timestamptz).build();
251+
Object result = shanghaiConverter.convertByMilvusType(schema, "2024-01-19 10:00:00");
252+
// 10:00 Shanghai = 02:00 UTC
253+
Assertions.assertEquals("2024-01-19T02:00:00Z", result);
254+
}
255+
256+
@Test
257+
public void testConvertByMilvusType_Timestamptz_String_WithoutOverrideUsesOriginalBehavior() {
258+
// String input + no per-field timezone → original Timestamp.valueOf behavior
259+
FieldSchema schema = FieldSchema.builder()
260+
.name("ts").dataType(DataType.Timestamptz).build();
261+
Object result = converter.convertByMilvusType(schema, "2024-01-19 10:00:00");
262+
Assertions.assertInstanceOf(String.class, result);
263+
Assertions.assertTrue(result.toString().contains("2024-01-19"));
264+
}
265+
266+
@Test
267+
public void testConvertByMilvusType_Timestamptz_String_Iso8601PassedThrough() {
268+
// ISO 8601 with offset — should pass through even with per-field timezone
269+
Map<String, ZoneId> overrides = new HashMap<>();
270+
overrides.put("ts", ZoneId.of("Asia/Shanghai"));
271+
MilvusSinkConverter shanghaiConverter = new MilvusSinkConverter(
272+
GeometryConverter.PASSTHROUGH, overrides);
273+
FieldSchema schema = FieldSchema.builder()
274+
.name("ts").dataType(DataType.Timestamptz).build();
275+
Object result = shanghaiConverter.convertByMilvusType(schema, "2024-01-19T10:00:00Z");
276+
// Already has offset Z — LocalDateTime.parse fails, falls through as-is
277+
Assertions.assertEquals("2024-01-19T10:00:00Z", result);
278+
}
279+
280+
@Test
281+
public void testConvertByMilvusType_Timestamptz_OffsetDateTime_IgnoresPerFieldTimezone() {
282+
// OffsetDateTime already carries offset — per-field timezone must NOT affect it
283+
Map<String, ZoneId> overrides = new HashMap<>();
284+
overrides.put("ts", ZoneId.of("Asia/Shanghai"));
285+
MilvusSinkConverter shanghaiConverter = new MilvusSinkConverter(
286+
GeometryConverter.PASSTHROUGH, overrides);
287+
FieldSchema schema = FieldSchema.builder()
288+
.name("ts").dataType(DataType.Timestamptz).build();
289+
OffsetDateTime odt = OffsetDateTime.of(2024, 1, 19, 10, 0, 0, 0, ZoneOffset.ofHours(5));
290+
Object result = shanghaiConverter.convertByMilvusType(schema, odt);
291+
// 10:00+05 = 05:00 UTC, regardless of per-field timezone setting
292+
Assertions.assertEquals("2024-01-19T05:00:00Z", result);
293+
}
294+
295+
@Test
296+
public void testConvertBySeaTunnelType_TimestampTz_LocalDateTime_UsesSystemDefault() {
297+
// convertBySeaTunnelType has no field name, always uses systemDefault
298+
LocalDateTime ldt = LocalDateTime.of(2024, 1, 19, 10, 0, 0);
299+
Object result = converter.convertBySeaTunnelType(
300+
LocalTimeType.OFFSET_DATE_TIME_TYPE, false, ldt);
301+
Assertions.assertInstanceOf(String.class, result);
302+
String expected = ldt.atZone(ZoneId.systemDefault()).toInstant().toString();
303+
Assertions.assertEquals(expected, result);
304+
}
305+
137306
// --- Consistency: both paths return same type for same input ---
138307

139308
@Test

0 commit comments

Comments
 (0)