diff --git a/src/abstractdataframe/abstractdataframe.jl b/src/abstractdataframe/abstractdataframe.jl index 68ad55a54..d7c3365f4 100644 --- a/src/abstractdataframe/abstractdataframe.jl +++ b/src/abstractdataframe/abstractdataframe.jl @@ -735,34 +735,36 @@ function _describe(df::AbstractDataFrame, stats::AbstractVector) data = DataFrame() data.variable = propertynames(df) + predefined_funs_local = predefined_funs + # An array of Dicts for summary statistics col_stats_dicts = map(eachcol(df)) do col if eltype(col) >: Missing t = skipmissing(col) - d = get_stats(t, predefined_funs) + d = get_stats(t, predefined_funs_local) get_stats!(d, t, custom_funs) else - d = get_stats(col, predefined_funs) + d = get_stats(col, predefined_funs_local) get_stats!(d, col, custom_funs) end - if :nmissing in predefined_funs + if :nmissing in predefined_funs_local d[:nmissing] = count(ismissing, col) end - if :nnonmissing in predefined_funs + if :nnonmissing in predefined_funs_local d[:nnonmissing] = count(!ismissing, col) end - if :first in predefined_funs + if :first in predefined_funs_local d[:first] = isempty(col) ? nothing : first(col) end - if :last in predefined_funs + if :last in predefined_funs_local d[:last] = isempty(col) ? nothing : last(col) end - if :eltype in predefined_funs + if :eltype in predefined_funs_local d[:eltype] = eltype(col) end diff --git a/src/abstractdataframe/iteration.jl b/src/abstractdataframe/iteration.jl index 7954d011e..27034555a 100644 --- a/src/abstractdataframe/iteration.jl +++ b/src/abstractdataframe/iteration.jl @@ -713,11 +713,11 @@ function Base.reduce(::typeof(vcat), if source !== nothing len = length(dfs) - if source isa SymbolOrString - col, vals = source, 1:len + col, vals = if source isa SymbolOrString + source, 1:len else @assert source isa Pair{<:SymbolOrString,<:AbstractVector} - col, vals = source + source end if columnindex(res, col) > 0 diff --git a/src/abstractdataframe/subset.jl b/src/abstractdataframe/subset.jl index 6edd21dcf..9ff5d2a3c 100644 --- a/src/abstractdataframe/subset.jl +++ b/src/abstractdataframe/subset.jl @@ -515,7 +515,7 @@ function subset!(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Boo end # update GroupedDataFrame indices in a thread safe way - Threads.lock(lazy_lock) do + Base.@lock lazy_lock begin gdf.groups = newgroups gdf.idx = nothing gdf.starts = nothing diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 2b68ea261..9bbcfbf82 100755 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -216,15 +216,17 @@ mutable struct DataFrame <: AbstractDataFrame end len == -1 && (len = 1) # we got no vectors so make one row of scalars + len_local = len + # we write into columns as we know that it is guaranteed # that it was freshly allocated in the outer constructor - if copycols && len >= 100_000 && length(columns) > 1 && Threads.nthreads() > 1 + if copycols && len_local >= 100_000 && length(columns) > 1 && Threads.nthreads() > 1 @sync for i in eachindex(columns) - @spawn columns[i] = _preprocess_column(columns[i], len, copycols) + @spawn columns[i] = _preprocess_column(columns[i], len_local, copycols) end else for i in eachindex(columns) - columns[i] = _preprocess_column(columns[i], len, copycols) + columns[i] = _preprocess_column(columns[i], len_local, copycols) end end @@ -901,8 +903,9 @@ function _deleteat!_helper(df::DataFrame, drop) return df end - if any(c -> c === drop || Base.mightalias(c, drop), cols) - drop = copy(drop) + drop_local = drop + if any(c -> c === drop_local || Base.mightalias(c, drop_local), cols) + drop = copy(drop_local) end n = nrow(df) diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index 1a907de6f..19ddb3ac0 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -32,30 +32,32 @@ function _combine_with_first((first,)::Ref{Any}, @assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple} extrude = false + firstrow = first lgd = length(gd) - if first isa AbstractDataFrame + eltys = if firstrow isa AbstractDataFrame n = 0 - eltys = eltype.(eachcol(first)) - elseif first isa NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}} + eltype.(eachcol(firstrow)) + elseif firstrow isa NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}} n = 0 - eltys = map(eltype, first) - elseif first isa DataFrameRow + map(eltype, firstrow) + elseif firstrow isa DataFrameRow n = lgd - eltys = [eltype(parent(first)[!, i]) for i in parentcols(index(first))] - elseif first isa Tables.AbstractRow + [eltype(parent(firstrow)[!, i]) for i in parentcols(index(firstrow))] + elseif firstrow isa Tables.AbstractRow n = lgd - eltys = [typeof(Tables.getcolumn(first, name)) for name in Tables.columnnames(first)] - elseif !firstmulticol && first[1] isa Union{AbstractArray{<:Any, 0}, Ref} + [typeof(Tables.getcolumn(firstrow, name)) for name in Tables.columnnames(firstrow)] + elseif !firstmulticol && firstrow[1] isa Union{AbstractArray{<:Any, 0}, Ref} extrude = true - first = wrap_row(first[1], firstcoltype(firstmulticol)) + wrapped_first = wrap_row(firstrow[1], firstcoltype(firstmulticol)) n = lgd - eltys = (typeof(first[1]),) + (typeof(wrapped_first[1]),) else # other NamedTuple giving a single row n = lgd - eltys = map(typeof, first) + eltys = map(typeof, firstrow) if any(x -> x <: AbstractVector, eltys) throw(ArgumentError("mixing single values and vectors in a named tuple is not allowed")) end + eltys end idx = idx_agg === NOTHING_IDX_AGG ? Vector{Int}(undef, n) : idx_agg @@ -65,19 +67,21 @@ function _combine_with_first((first,)::Ref{Any}, sizehint!(idx, lgd) local initialcols + firstrow_local = extrude ? wrapped_first : firstrow + let eltys=eltys, n=n # Workaround for julia#15276 - initialcols = ntuple(i -> Tables.allocatecolumn(eltys[i], n), _ncol(first)) + initialcols = ntuple(i -> Tables.allocatecolumn(eltys[i], n), _ncol(firstrow_local)) end - targetcolnames = first isa Tables.AbstractRow ? - tuple(Tables.columnnames(first)...) : - tuple(propertynames(first)...) - if !extrude && first isa Union{AbstractDataFrame, + targetcolnames = firstrow_local isa Tables.AbstractRow ? + tuple(Tables.columnnames(firstrow_local)...) : + tuple(propertynames(firstrow_local)...) + if !extrude && firstrow_local isa Union{AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}} - outcols, finalcolnames = _combine_tables_with_first!(first, initialcols, idx, 1, 1, + outcols, finalcolnames = _combine_tables_with_first!(firstrow_local, initialcols, idx, 1, 1, f, gd, incols, targetcolnames, firstcoltype(firstmulticol)) else - outcols, finalcolnames = _combine_rows_with_first!(Ref{Any}(first), + outcols, finalcolnames = _combine_rows_with_first!(Ref{Any}(firstrow_local), Ref{Any}(initialcols), Ref{Any}(f), gd, @@ -144,8 +148,7 @@ function _combine_rows_with_first_task!(tid::Integer, j = fill_row!(row, outcols, i, 1, colnames) if j !== nothing # Need to widen column # If another thread is already widening outcols, wait until it's done - lock(widen_type_lock) - try + Base.@lock widen_type_lock begin newoutcols = outcolsref[] # Workaround for julia#15276 newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row @@ -172,8 +175,6 @@ function _combine_rows_with_first_task!(tid::Integer, outcolsref[] = newoutcols type_widened[tid] = false - finally - unlock(widen_type_lock) end return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, type_widened, widen_type_lock, @@ -185,7 +186,7 @@ function _combine_rows_with_first_task!(tid::Integer, # This doesn't have to happen immediately (hence type_widened isn't atomic), # but the more we wait the more data will have to be copied if type_widened[tid] - lock(widen_type_lock) do + Base.@lock widen_type_lock begin type_widened[tid] = false newoutcols = outcolsref[] for k in 1:length(outcols) @@ -276,13 +277,14 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any}, partitions = (2:len,) end widen_type_lock = ReentrantLock() - outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols) + outcols_local = outcols + outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols_local) type_widened = fill(false, length(partitions)) tasks = Vector{Task}(undef, length(partitions)) for (tid, idx) in enumerate(partitions) tasks[tid] = @spawn_or_run_task threads _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), - outcols, outcolsref, + outcols_local, outcolsref, type_widened, widen_type_lock, f, gd, starts, ends, incols, colnames, firstcoltype(firstmulticol)) @@ -378,10 +380,10 @@ function _combine_tables_with_first!(first::Union{AbstractDataFrame, _ncol(rows) == 0 && continue if isempty(colnames) newcolnames = tuple(propertynames(rows)...) - if rows isa AbstractDataFrame - eltys = eltype.(eachcol(rows)) + eltys = if rows isa AbstractDataFrame + eltype.(eachcol(rows)) else - eltys = map(eltype, rows) + map(eltype, rows) end initialcols = ntuple(i -> Tables.allocatecolumn(eltys[i], 0), _ncol(rows)) return _combine_tables_with_first!(rows, initialcols, idx, i, 1, diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index 1747923df..acea4a037 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -124,7 +124,7 @@ function _combine_prepare_norm(gd::GroupedDataFrame, @assert length(gd_keys) > 0 || idx == gd.idx # in this case we are sure that the result GroupedDataFrame has the # same structure as the source except that grouping columns are at the start - return Threads.lock(gd.lazy_lock) do + Base.@lock gd.lazy_lock begin return GroupedDataFrame(newparent, copy(gd.cols), gd.groups, getfield(gd, :idx), getfield(gd, :starts), getfield(gd, :ends), gd.ngroups, @@ -298,19 +298,19 @@ function _combine_process_proprow((cs_i,)::Ref{Any}, # introduce outcol1 and outcol2 as without it outcol is boxed # since it is later used inside the anonymous function we return - if getfield(gd, :idx) === nothing + outcol = if getfield(gd, :idx) === nothing outcol1 = zeros(Float64, length(gd) + 1) @inbounds @simd for gix in gd.groups outcol1[gix + 1] += 1 end popfirst!(outcol1) outcol1 ./= sum(outcol1) - outcol = outcol1 + outcol1 else outcol2 = Vector{Float64}(undef, length(gd)) outcol2 .= gd.ends .- gd.starts .+ 1 outcol2 ./= sum(outcol2) - outcol = outcol2 + outcol2 end return function() @@ -384,7 +384,7 @@ function _combine_process_callable(wcs_i::Ref{Any}, if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}) - lock(gd.lazy_lock) do + Base.@lock gd.lazy_lock begin # if idx_agg was not computed yet it is NOTHING_IDX_AGG # in this case if we are not passed a vector compute it. if idx_agg[] === NOTHING_IDX_AGG @@ -396,6 +396,7 @@ function _combine_process_callable(wcs_i::Ref{Any}, end end @assert length(outcols) == length(nms) + idx_local = idx return function() for j in eachindex(outcols) outcol = outcols[j] @@ -408,11 +409,11 @@ function _combine_process_callable(wcs_i::Ref{Any}, # we have seen this col but it is not allowed to replace it optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i, 0) + trans_res[loc] = TransformationResult(idx_local, outcol, out_col_name, optional_i, 0) seen_cols[out_col_name] = (optional_i, loc) end else - push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i, 0)) + push!(trans_res, TransformationResult(idx_local, outcol, out_col_name, optional_i, 0)) seen_cols[out_col_name] = (optional_i, length(trans_res)) end end @@ -443,7 +444,7 @@ function _combine_process_pair_symbol(optional_i::Bool, end # if idx_agg was not computed yet it is NOTHING_IDX_AGG # in this case if we are not passed a vector compute it. - lock(gd.lazy_lock) do + Base.@lock gd.lazy_lock begin if !(firstres isa AbstractVector) && idx_agg[] === NOTHING_IDX_AGG idx_agg[] = Vector{Int}(undef, length(gd)) fillfirst!(nothing, idx_agg[], 1:length(gd.groups), gd) @@ -463,13 +464,13 @@ function _combine_process_pair_symbol(optional_i::Bool, @assert length(outcols) == 1 outcol = outcols[1] - if (source_cols isa Int || + metacol = if (source_cols isa Int || (source_cols isa AbstractVector{Int} && length(source_cols) == 1)) && (only(source_cols) == columnindex(parent(gd), out_col_name) || only(wfun) === identity || only(wfun) === copy) - metacol = only(source_cols) + only(source_cols) else - metacol = 0 + 0 end return function() @@ -546,7 +547,7 @@ function _combine_process_pair_astable(optional_i::Bool, wincols, threads) if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}) - lock(gd.lazy_lock) do + Base.@lock gd.lazy_lock begin # if idx_agg was not computed yet it is nothing # in this case if we are not passed a vector compute it. if idx_agg[] === NOTHING_IDX_AGG @@ -568,24 +569,27 @@ function _combine_process_pair_astable(optional_i::Bool, nms = out_col_name end end + outcols_local = outcols + nms_local = nms + idx_local = idx return function() - for j in eachindex(outcols) - outcol = outcols[j] - out_col_name = nms[j] - if haskey(seen_cols, out_col_name) - optional, loc = seen_cols[out_col_name] + for j in eachindex(outcols_local) + outcol = outcols_local[j] + out_col_name_j = nms_local[j] + if haskey(seen_cols, out_col_name_j) + optional, loc = seen_cols[out_col_name_j] # if column was seen and it is optional now ignore it if !optional_i - optional, loc = seen_cols[out_col_name] + optional, loc = seen_cols[out_col_name_j] # we have seen this col but it is not allowed to replace it - optional || throw(ArgumentError("duplicate output column name: :$out_col_name")) - @assert trans_res[loc].optional && trans_res[loc].name == out_col_name - trans_res[loc] = TransformationResult(idx, outcol, out_col_name, optional_i, 0) - seen_cols[out_col_name] = (optional_i, loc) + optional || throw(ArgumentError("duplicate output column name: :$out_col_name_j")) + @assert trans_res[loc].optional && trans_res[loc].name == out_col_name_j + trans_res[loc] = TransformationResult(idx_local, outcol, out_col_name_j, optional_i, 0) + seen_cols[out_col_name_j] = (optional_i, loc) end else - push!(trans_res, TransformationResult(idx, outcol, out_col_name, optional_i, 0)) - seen_cols[out_col_name] = (optional_i, length(trans_res)) + push!(trans_res, TransformationResult(idx_local, outcol, out_col_name_j, optional_i, 0)) + seen_cols[out_col_name_j] = (optional_i, length(trans_res)) end end end @@ -675,15 +679,15 @@ function _combine(gd::GroupedDataFrame, return Int[], DataFrame(), Int[] end - if keeprows + idx_keeprows = if keeprows if nrow(parent(gd)) > 0 && minimum(gd.groups) == 0 throw(ArgumentError("select and transform do not support " * "`GroupedDataFrame`s from which some groups have "* "been dropped (including skipmissing=true)")) end - idx_keeprows = prepare_idx_keeprows(gd.idx, gd.starts, gd.ends, nrow(parent(gd))) + prepare_idx_keeprows(gd.idx, gd.starts, gd.ends, nrow(parent(gd))) else - idx_keeprows = Int[] + Int[] end idx_agg = Ref(NOTHING_IDX_AGG) diff --git a/src/groupeddataframe/utils.jl b/src/groupeddataframe/utils.jl index 9d882fcf9..652d87acb 100644 --- a/src/groupeddataframe/utils.jl +++ b/src/groupeddataframe/utils.jl @@ -261,6 +261,24 @@ function row_group_slots!(cols::Tuple{Vararg{AbstractVector}}, return ngroups, rhashes, gslots, false end +function _reduce_or!(x::AbstractVector{Vector{Bool}}) + len = length(x) + if len < 2 + return + elseif len == 2 + x[1] .|= x[2] + else + xl = view(x, 1:len ÷ 2) + xr = view(x, len ÷ 2 + 1:len) + t1 = @spawn _reduce_or!(xl) + t2 = @spawn _reduce_or!(xr) + fetch(t1) + fetch(t2) + xl[1] .|= xr[1] + end + return +end + # Optimized method for arrays for which DataAPI.refpool is defined and returns an AbstractVector function row_group_slots!(cols::NTuple{N, AbstractVector}, refpools::NTuple{N, AbstractVector}, @@ -424,25 +442,7 @@ function row_group_slots!(cols::NTuple{N, AbstractVector}, end end - function reduce_or!(x::AbstractVector{Vector{Bool}}) - len = length(x) - if len < 2 - return - elseif len == 2 - x[1] .|= x[2] - else - xl = view(x, 1:len ÷ 2) - xr = view(x, len ÷ 2 + 1:len) - t1 = @spawn reduce_or!(xl) - t2 = @spawn reduce_or!(xr) - fetch(t1) - fetch(t2) - xl[1] .|= xr[1] - end - return - end - - reduce_or!(seen_vec) + _reduce_or!(seen_vec) # If some groups are unused, compress group indices to drop them # sum(seen) is faster than all(seen) when not short-circuiting, diff --git a/src/join/composer.jl b/src/join/composer.jl index 3cd4e90b3..f4ba5d853 100644 --- a/src/join/composer.jl +++ b/src/join/composer.jl @@ -256,14 +256,17 @@ function compose_inner_table(joiner::DataFrameJoiner, right_ixs = right_ixs[csp_r] end - if Threads.nthreads() > 1 && length(left_ixs) >= 100_000 - dfl_task = @spawn joiner.dfl[left_ixs, :] - dfr_noon_task = @spawn joiner.dfr[right_ixs, Not(joiner.right_on)] + left_ixs_final = left_ixs + right_ixs_final = right_ixs + + if Threads.nthreads() > 1 && length(left_ixs_final) >= 100_000 + dfl_task = @spawn joiner.dfl[left_ixs_final, :] + dfr_noon_task = @spawn joiner.dfr[right_ixs_final, Not(joiner.right_on)] dfl = fetch(dfl_task) dfr_noon = fetch(dfr_noon_task) else - dfl = joiner.dfl[left_ixs, :] - dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] + dfl = joiner.dfl[left_ixs_final, :] + dfr_noon = joiner.dfr[right_ixs_final, Not(joiner.right_on)] end ncleft = ncol(dfl)