Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/abstractdataframe/abstractdataframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -735,34 +735,36 @@ function _describe(df::AbstractDataFrame, stats::AbstractVector)
data = DataFrame()
data.variable = propertynames(df)

predefined_funs_local = predefined_funs

# An array of Dicts for summary statistics
col_stats_dicts = map(eachcol(df)) do col
if eltype(col) >: Missing
t = skipmissing(col)
d = get_stats(t, predefined_funs)
d = get_stats(t, predefined_funs_local)
get_stats!(d, t, custom_funs)
else
d = get_stats(col, predefined_funs)
d = get_stats(col, predefined_funs_local)
get_stats!(d, col, custom_funs)
end

if :nmissing in predefined_funs
if :nmissing in predefined_funs_local
d[:nmissing] = count(ismissing, col)
end

if :nnonmissing in predefined_funs
if :nnonmissing in predefined_funs_local
d[:nnonmissing] = count(!ismissing, col)
end

if :first in predefined_funs
if :first in predefined_funs_local
d[:first] = isempty(col) ? nothing : first(col)
end

if :last in predefined_funs
if :last in predefined_funs_local
d[:last] = isempty(col) ? nothing : last(col)
end

if :eltype in predefined_funs
if :eltype in predefined_funs_local
d[:eltype] = eltype(col)
end

Expand Down
6 changes: 3 additions & 3 deletions src/abstractdataframe/iteration.jl
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,11 @@ function Base.reduce(::typeof(vcat),

if source !== nothing
len = length(dfs)
if source isa SymbolOrString
col, vals = source, 1:len
col, vals = if source isa SymbolOrString
source, 1:len
else
@assert source isa Pair{<:SymbolOrString,<:AbstractVector}
col, vals = source
source
end

if columnindex(res, col) > 0
Expand Down
2 changes: 1 addition & 1 deletion src/abstractdataframe/subset.jl
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ function subset!(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Boo
end

# update GroupedDataFrame indices in a thread safe way
Threads.lock(lazy_lock) do
Base.@lock lazy_lock begin
gdf.groups = newgroups
gdf.idx = nothing
gdf.starts = nothing
Expand Down
13 changes: 8 additions & 5 deletions src/dataframe/dataframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,17 @@ mutable struct DataFrame <: AbstractDataFrame
end
len == -1 && (len = 1) # we got no vectors so make one row of scalars

len_val = len
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this len_local like in other places to make it easier to guess why we do this?


# we write into columns as we know that it is guaranteed
# that it was freshly allocated in the outer constructor
if copycols && len >= 100_000 && length(columns) > 1 && Threads.nthreads() > 1
if copycols && len_val >= 100_000 && length(columns) > 1 && Threads.nthreads() > 1
@sync for i in eachindex(columns)
@spawn columns[i] = _preprocess_column(columns[i], len, copycols)
@spawn columns[i] = _preprocess_column(columns[i], len_val, copycols)
end
else
for i in eachindex(columns)
columns[i] = _preprocess_column(columns[i], len, copycols)
columns[i] = _preprocess_column(columns[i], len_val, copycols)
end
end

Expand Down Expand Up @@ -901,8 +903,9 @@ function _deleteat!_helper(df::DataFrame, drop)
return df
end

if any(c -> c === drop || Base.mightalias(c, drop), cols)
drop = copy(drop)
drop_local = drop
if any(c -> c === drop_local || Base.mightalias(c, drop_local), cols)
drop = copy(drop_local)
end

n = nrow(df)
Expand Down
60 changes: 31 additions & 29 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,29 @@ function _combine_with_first((first,)::Ref{Any},
@assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple}
extrude = false

firstrow = first
wrapped_first = nothing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this assignment needed? (we make sure not to access wrapped_first if extrude is false)

lgd = length(gd)
if first isa AbstractDataFrame
if firstrow isa AbstractDataFrame
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should eltys assignment be outside of if (as you did above in src/abstractdataframe/iteration.jl change)

n = 0
eltys = eltype.(eachcol(first))
elseif first isa NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}
eltys = eltype.(eachcol(firstrow))
elseif firstrow isa NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}
n = 0
eltys = map(eltype, first)
elseif first isa DataFrameRow
eltys = map(eltype, firstrow)
elseif firstrow isa DataFrameRow
n = lgd
eltys = [eltype(parent(first)[!, i]) for i in parentcols(index(first))]
elseif first isa Tables.AbstractRow
eltys = [eltype(parent(firstrow)[!, i]) for i in parentcols(index(firstrow))]
elseif firstrow isa Tables.AbstractRow
n = lgd
eltys = [typeof(Tables.getcolumn(first, name)) for name in Tables.columnnames(first)]
elseif !firstmulticol && first[1] isa Union{AbstractArray{<:Any, 0}, Ref}
eltys = [typeof(Tables.getcolumn(firstrow, name)) for name in Tables.columnnames(firstrow)]
elseif !firstmulticol && firstrow[1] isa Union{AbstractArray{<:Any, 0}, Ref}
extrude = true
first = wrap_row(first[1], firstcoltype(firstmulticol))
wrapped_first = wrap_row(firstrow[1], firstcoltype(firstmulticol))
n = lgd
eltys = (typeof(first[1]),)
eltys = (typeof(wrapped_first[1]),)
else # other NamedTuple giving a single row
n = lgd
eltys = map(typeof, first)
eltys = map(typeof, firstrow)
if any(x -> x <: AbstractVector, eltys)
throw(ArgumentError("mixing single values and vectors in a named tuple is not allowed"))
end
Expand All @@ -65,19 +67,21 @@ function _combine_with_first((first,)::Ref{Any},
sizehint!(idx, lgd)

local initialcols
firstrow_local = extrude ? wrapped_first : firstrow

let eltys=eltys, n=n # Workaround for julia#15276
initialcols = ntuple(i -> Tables.allocatecolumn(eltys[i], n), _ncol(first))
initialcols = ntuple(i -> Tables.allocatecolumn(eltys[i], n), _ncol(firstrow_local))
end
targetcolnames = first isa Tables.AbstractRow ?
tuple(Tables.columnnames(first)...) :
tuple(propertynames(first)...)
if !extrude && first isa Union{AbstractDataFrame,
targetcolnames = firstrow_local isa Tables.AbstractRow ?
tuple(Tables.columnnames(firstrow_local)...) :
tuple(propertynames(firstrow_local)...)
if !extrude && firstrow_local isa Union{AbstractDataFrame,
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}
outcols, finalcolnames = _combine_tables_with_first!(first, initialcols, idx, 1, 1,
outcols, finalcolnames = _combine_tables_with_first!(firstrow_local, initialcols, idx, 1, 1,
f, gd, incols, targetcolnames,
firstcoltype(firstmulticol))
else
outcols, finalcolnames = _combine_rows_with_first!(Ref{Any}(first),
outcols, finalcolnames = _combine_rows_with_first!(Ref{Any}(firstrow_local),
Ref{Any}(initialcols),
Ref{Any}(f),
gd,
Expand Down Expand Up @@ -144,8 +148,7 @@ function _combine_rows_with_first_task!(tid::Integer,
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column
# If another thread is already widening outcols, wait until it's done
lock(widen_type_lock)
try
Base.@lock widen_type_lock begin
newoutcols = outcolsref[]
# Workaround for julia#15276
newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row
Expand All @@ -172,8 +175,6 @@ function _combine_rows_with_first_task!(tid::Integer,

outcolsref[] = newoutcols
type_widened[tid] = false
finally
unlock(widen_type_lock)
end
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
Expand All @@ -185,7 +186,7 @@ function _combine_rows_with_first_task!(tid::Integer,
# This doesn't have to happen immediately (hence type_widened isn't atomic),
# but the more we wait the more data will have to be copied
if type_widened[tid]
lock(widen_type_lock) do
Base.@lock widen_type_lock begin
type_widened[tid] = false
newoutcols = outcolsref[]
for k in 1:length(outcols)
Expand Down Expand Up @@ -276,13 +277,14 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
partitions = (2:len,)
end
widen_type_lock = ReentrantLock()
outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols)
outcols_local = outcols
outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols_local)
type_widened = fill(false, length(partitions))
tasks = Vector{Task}(undef, length(partitions))
for (tid, idx) in enumerate(partitions)
tasks[tid] =
@spawn_or_run_task threads _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
outcols, outcolsref,
outcols_local, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstcoltype(firstmulticol))
Expand Down Expand Up @@ -378,10 +380,10 @@ function _combine_tables_with_first!(first::Union{AbstractDataFrame,
_ncol(rows) == 0 && continue
if isempty(colnames)
newcolnames = tuple(propertynames(rows)...)
if rows isa AbstractDataFrame
eltys = eltype.(eachcol(rows))
eltys = if rows isa AbstractDataFrame
eltype.(eachcol(rows))
else
eltys = map(eltype, rows)
map(eltype, rows)
end
initialcols = ntuple(i -> Tables.allocatecolumn(eltys[i], 0), _ncol(rows))
return _combine_tables_with_first!(rows, initialcols, idx, i, 1,
Expand Down
58 changes: 31 additions & 27 deletions src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ function _combine_prepare_norm(gd::GroupedDataFrame,
@assert length(gd_keys) > 0 || idx == gd.idx
# in this case we are sure that the result GroupedDataFrame has the
# same structure as the source except that grouping columns are at the start
return Threads.lock(gd.lazy_lock) do
Base.@lock gd.lazy_lock begin
return GroupedDataFrame(newparent, copy(gd.cols), gd.groups,
getfield(gd, :idx), getfield(gd, :starts),
getfield(gd, :ends), gd.ngroups,
Expand Down Expand Up @@ -298,19 +298,19 @@ function _combine_process_proprow((cs_i,)::Ref{Any},

# introduce outcol1 and outcol2 as without it outcol is boxed
# since it is later used inside the anonymous function we return
if getfield(gd, :idx) === nothing
outcol = if getfield(gd, :idx) === nothing
outcol1 = zeros(Float64, length(gd) + 1)
@inbounds @simd for gix in gd.groups
outcol1[gix + 1] += 1
end
popfirst!(outcol1)
outcol1 ./= sum(outcol1)
outcol = outcol1
outcol1
else
outcol2 = Vector{Float64}(undef, length(gd))
outcol2 .= gd.ends .- gd.starts .+ 1
outcol2 ./= sum(outcol2)
outcol = outcol2
outcol2
end

return function()
Expand Down Expand Up @@ -384,7 +384,7 @@ function _combine_process_callable(wcs_i::Ref{Any},

if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame,
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}})
lock(gd.lazy_lock) do
Base.@lock gd.lazy_lock begin
# if idx_agg was not computed yet it is NOTHING_IDX_AGG
# in this case if we are not passed a vector compute it.
if idx_agg[] === NOTHING_IDX_AGG
Expand All @@ -396,6 +396,7 @@ function _combine_process_callable(wcs_i::Ref{Any},
end
end
@assert length(outcols) == length(nms)
idx_local = idx
return function()
for j in eachindex(outcols)
outcol = outcols[j]
Expand All @@ -408,11 +409,11 @@ function _combine_process_callable(wcs_i::Ref{Any},
# we have seen this col but it is not allowed to replace it
optional || throw(ArgumentError("duplicate output column name: :$out_col_name"))
@assert trans_res[loc].optional && trans_res[loc].name == out_col_name
trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i, 0)
trans_res[loc] = TransformationResult(idx_local, outcol, out_col_name, optional_i, 0)
seen_cols[out_col_name] = (optional_i, loc)
end
else
push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i, 0))
push!(trans_res, TransformationResult(idx_local, outcol, out_col_name, optional_i, 0))
seen_cols[out_col_name] = (optional_i, length(trans_res))
end
end
Expand Down Expand Up @@ -443,7 +444,7 @@ function _combine_process_pair_symbol(optional_i::Bool,
end
# if idx_agg was not computed yet it is NOTHING_IDX_AGG
# in this case if we are not passed a vector compute it.
lock(gd.lazy_lock) do
Base.@lock gd.lazy_lock begin
if !(firstres isa AbstractVector) && idx_agg[] === NOTHING_IDX_AGG
idx_agg[] = Vector{Int}(undef, length(gd))
fillfirst!(nothing, idx_agg[], 1:length(gd.groups), gd)
Expand All @@ -463,13 +464,13 @@ function _combine_process_pair_symbol(optional_i::Bool,
@assert length(outcols) == 1
outcol = outcols[1]

if (source_cols isa Int ||
metacol = if (source_cols isa Int ||
(source_cols isa AbstractVector{Int} && length(source_cols) == 1)) &&
(only(source_cols) == columnindex(parent(gd), out_col_name) ||
only(wfun) === identity || only(wfun) === copy)
metacol = only(source_cols)
only(source_cols)
else
metacol = 0
0
end

return function()
Expand Down Expand Up @@ -546,7 +547,7 @@ function _combine_process_pair_astable(optional_i::Bool,
wincols, threads)
if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame,
NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}})
lock(gd.lazy_lock) do
Base.@lock gd.lazy_lock begin
# if idx_agg was not computed yet it is nothing
# in this case if we are not passed a vector compute it.
if idx_agg[] === NOTHING_IDX_AGG
Expand All @@ -568,24 +569,27 @@ function _combine_process_pair_astable(optional_i::Bool,
nms = out_col_name
end
end
outcols_local = outcols
nms_local = nms
idx_local = idx
return function()
for j in eachindex(outcols)
outcol = outcols[j]
out_col_name = nms[j]
if haskey(seen_cols, out_col_name)
optional, loc = seen_cols[out_col_name]
for j in eachindex(outcols_local)
outcol = outcols_local[j]
out_col_name_j = nms_local[j]
if haskey(seen_cols, out_col_name_j)
optional, loc = seen_cols[out_col_name_j]
# if column was seen and it is optional now ignore it
if !optional_i
optional, loc = seen_cols[out_col_name]
optional, loc = seen_cols[out_col_name_j]
# we have seen this col but it is not allowed to replace it
optional || throw(ArgumentError("duplicate output column name: :$out_col_name"))
@assert trans_res[loc].optional && trans_res[loc].name == out_col_name
trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i, 0)
seen_cols[out_col_name] = (optional_i, loc)
optional || throw(ArgumentError("duplicate output column name: :$out_col_name_j"))
@assert trans_res[loc].optional && trans_res[loc].name == out_col_name_j
trans_res[loc] = TransformationResult(idx_local, outcol, out_col_name_j, optional_i, 0)
seen_cols[out_col_name_j] = (optional_i, loc)
end
else
push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i, 0))
seen_cols[out_col_name] = (optional_i, length(trans_res))
push!(trans_res, TransformationResult(idx_local, outcol, out_col_name_j, optional_i, 0))
seen_cols[out_col_name_j] = (optional_i, length(trans_res))
end
end
end
Expand Down Expand Up @@ -675,15 +679,15 @@ function _combine(gd::GroupedDataFrame,
return Int[], DataFrame(), Int[]
end

if keeprows
idx_keeprows = if keeprows
if nrow(parent(gd)) > 0 && minimum(gd.groups) == 0
throw(ArgumentError("select and transform do not support " *
"`GroupedDataFrame`s from which some groups have "*
"been dropped (including skipmissing=true)"))
end
idx_keeprows = prepare_idx_keeprows(gd.idx, gd.starts, gd.ends, nrow(parent(gd)))
prepare_idx_keeprows(gd.idx, gd.starts, gd.ends, nrow(parent(gd)))
else
idx_keeprows = Int[]
Int[]
end

idx_agg = Ref(NOTHING_IDX_AGG)
Expand Down
Loading
Loading