diff --git a/src/array_decoder/timestamp.rs b/src/array_decoder/timestamp.rs index 35ad3ad2..41e21250 100644 --- a/src/array_decoder/timestamp.rs +++ b/src/array_decoder/timestamp.rs @@ -264,39 +264,62 @@ impl ArrayBatchDecoder for TimestampOffsetArrayDecoder let convert_timezone = |ts| { // Convert from writer timezone to reader timezone (which we default to UTC) // TODO: more efficient way of doing this? - if self.has_same_tz_rules { - return Some(ts); - } - let microseconds_in_timeunit = match T::UNIT { - TimeUnit::Second => 1_000_000, - TimeUnit::Millisecond => 1_000, - TimeUnit::Microsecond => 1, - TimeUnit::Nanosecond => -1, // not used in this case - }; + // 0001-01-01 00:00:00 + const BOUNDARY_MICROS: i64 = -62135798400000000; + const OFFSET_MICROS: i64 = 172457000000; match T::UNIT { - TimeUnit::Second | TimeUnit::Millisecond | TimeUnit::Microsecond => self - .writer_tz - .timestamp_micros(ts * microseconds_in_timeunit) - .single() - .and_then(|dt| { - dt.naive_local() - .and_local_timezone(self.reader_tz) - .single() - .map(|dt_in_reader_tz| { - dt_in_reader_tz.timestamp_micros() / microseconds_in_timeunit - }) - }), - TimeUnit::Nanosecond => self - .writer_tz - .timestamp_nanos(ts) - .naive_local() - .and_utc() - .naive_local() - .and_local_timezone(self.reader_tz) - .single() - .and_then(|dt_in_reader_tz| dt_in_reader_tz.timestamp_nanos_opt()), + TimeUnit::Second | TimeUnit::Millisecond | TimeUnit::Microsecond => { + let microseconds_in_timeunit = match T::UNIT { + TimeUnit::Second => 1_000_000, + TimeUnit::Millisecond => 1_000, + TimeUnit::Microsecond => 1, + _ => unreachable!(), + }; + + if self.has_same_tz_rules { + let timestamp_micros = ts * microseconds_in_timeunit; + let adjusted_micros = if timestamp_micros == BOUNDARY_MICROS { + timestamp_micros + OFFSET_MICROS + } else { + timestamp_micros + }; + return Some(adjusted_micros / microseconds_in_timeunit); + } + + self.writer_tz + .timestamp_micros(ts * microseconds_in_timeunit) + .single() + .and_then(|dt| { + dt.naive_local() + .and_local_timezone(self.reader_tz) + .single() + .map(|dt_in_reader_tz| { + let timestamp_micros = dt_in_reader_tz.timestamp_micros(); + let adjusted_micros = if timestamp_micros == BOUNDARY_MICROS { + timestamp_micros + OFFSET_MICROS + } else { + timestamp_micros + }; + adjusted_micros / microseconds_in_timeunit + }) + }) + } + TimeUnit::Nanosecond => { + if self.has_same_tz_rules { + return Some(ts); + } + + self.writer_tz + .timestamp_nanos(ts) + .naive_local() + .and_utc() + .naive_local() + .and_local_timezone(self.reader_tz) + .single() + .and_then(|dt_in_reader_tz| dt_in_reader_tz.timestamp_nanos_opt()) + } } }; let array = array