610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818 | def solve_VIE_2(*, kernel_values, g_values=None, time_step=1.0, coll_divs=2,
coll_choices=[0,1,2], return_polys=False, show_warnings=True):
r'''
Solve a Volterra integral equation of the second kind.
Finds $y(t)$ satisfying
$$y(t) = g(t) + \int_0^t K(t-s)\,y(s)\,ds$$
Parameters
----------
kernel_values : array_like of shape (N,) or (N, d, d)
Values of $K(s)$ at times $s = 0, h, 2h, \ldots, (N-1)h$, where $h$
is ``time_step``. Pass a 1-D array for scalar equations or a 3-D array
of shape ``(N, d, d)`` for $d$-dimensional vector equations.
g_values : array_like of shape (N,) or (N, d) or (N, d, m), optional
Right-hand side $g(t)$ sampled at the same times as ``kernel_values``.
For matrix-valued equations pass shape ``(N, d, m)`` to solve $m$
right-hand sides simultaneously. Defaults to zero.
time_step : float, optional
Spacing $h$ between consecutive sample times. Must be positive.
Default is 1.0.
coll_divs : int, optional
Number of collocation sub-intervals per mesh interval. Must be a
positive integer. Default is 2.
coll_choices : list of int, optional
Indices selecting the collocation nodes within each sub-interval.
Each entry $k$ corresponds to the node $k / c$ where $c$ =
``coll_divs``, placed in $[0, 1]$. Entries must be distinct integers
in $\{0, 1, \ldots, \text{coll\_divs}\}$. Default is ``[0, 1, 2]``.
return_polys : bool, optional
If ``True``, also return the piecewise polynomial solution.
Default is ``False``.
show_warnings : bool, optional
If ``True`` (default), print a warning when ``kernel_values`` is
truncated or when the Numba fallback is used.
Returns
-------
soln_values : ndarray of shape (N,) or (N, d) or (N, d, m)
Solution values $y(t)$ at the same times as the input arrays.
Returned when ``return_polys=False`` (default).
(soln_values, polys) : tuple
Returned when ``return_polys=True``. ``soln_values`` is as above.
For scalar equations, ``polys`` is a list of
`numpy.polynomial.Polynomial` objects, one per mesh interval, each
mapping $t$ to the polynomial approximation of $y(t)$ on that
interval. For vector equations, each element of ``polys`` is an object
array of shape ``(d,)`` (or ``(d, m)`` for matrix equations) containing
one polynomial per component.
Notes
-----
The length $N$ of the input arrays must satisfy
$N \equiv 1 \pmod{\text{coll\_divs}^2}$. If a longer array is supplied it
is truncated to the largest conforming length and a warning is printed
(unless ``show_warnings=False``).
The solver dispatches at runtime to a D-extension routine specialised for
the given collocation setting. For scalar equations, settings not compiled
into the extension fall back to a Numba-JIT implementation (requires the
``numba`` optional dependency); a warning is printed when the fallback is
used. For vector equations only the compiled settings are supported.
References
----------
.. [1] Brunner, H. *Collocation Methods for Volterra Integral and Related
Functional Differential Equations.* Cambridge University Press, 2004.
Section 2.2.
'''
# ------------------------------------------------------------------ complex dispatch
if _cplx.is_complex(kernel_values, g_values):
K_arr = np.asarray(kernel_values)
is_scalar = (K_arr.ndim == 1)
d_orig = 0 if is_scalar else K_arr.shape[1]
K_real = _cplx._block_kernel(K_arr)
g_real = _cplx._expand_g(np.asarray(g_values)) if g_values is not None else None
result = solve_VIE_2(
kernel_values=K_real, g_values=g_real,
time_step=time_step, coll_divs=coll_divs, coll_choices=coll_choices,
return_polys=return_polys, show_warnings=show_warnings)
if return_polys:
soln_real, polys_real = result
return (_cplx._recombine(soln_real, d_orig),
_cplx._recombine_polys(polys_real, d_orig))
return _cplx._recombine(result, d_orig)
kernel_values_ = np.asarray(kernel_values, dtype=float)
ndim = kernel_values_.ndim
if ndim not in (1, 3):
raise ValueError(
f"kernel_values must be 1-D (scalar) or 3-D (N, d, d), got shape {kernel_values_.shape}")
N_orig = len(kernel_values_)
N, kernel_values_ = _truncate_N(kernel_values_, coll_divs, show_warnings)
# ------------------------------------------------------------------ vector path
if ndim == 3:
_, d1, d2 = kernel_values_.shape
if d1 != d2:
raise ValueError(f"kernel_values must have shape (N, d, d), got {kernel_values_.shape}")
d = d1
if g_values is not None:
g_values_ = np.asarray(g_values, dtype=float)
if g_values_.ndim == 3: # matrix case: shape (N, d, m_cols)
m_cols = g_values_.shape[2]
if g_values_.shape[1] != d:
raise ValueError(
f"g_values shape {g_values_.shape} incompatible with kernel_values shape {kernel_values_.shape}")
g_cols = g_values_[:N]
def _col_vie2(j):
return solve_VIE_2(kernel_values=kernel_values_,
g_values=g_cols[:, :, j],
time_step=time_step, coll_divs=coll_divs,
coll_choices=coll_choices,
return_polys=return_polys,
show_warnings=show_warnings)
with ThreadPoolExecutor(max_workers=m_cols) as ex:
results = list(ex.map(_col_vie2, range(m_cols)))
if return_polys:
col_solns = [r[0] for r in results]
col_polys = [r[1] for r in results]
soln = np.stack(col_solns, axis=2)
mesh_divs = len(col_polys[0])
mat_polys = []
for n in range(mesh_divs):
arr = np.empty((d, m_cols), dtype=object)
for j in range(m_cols):
arr[:, j] = col_polys[j][n]
mat_polys.append(arr)
return (soln, mat_polys)
return np.stack(results, axis=2)
else:
if g_values_.shape != (N_orig, d):
raise ValueError(
f"g_values shape {g_values_.shape} incompatible with kernel_values shape {kernel_values_.shape}")
g_values_ = g_values_[:N]
else:
g_values_ = np.zeros((N, d), dtype=float)
assert coll_divs > 0, "coll_divs must be a positive integer"
assert all(isinstance(c, int) for c in coll_choices), "coll_choices must be a list of integers"
assert all(coll_choices.count(c) <= 1 for c in coll_choices), \
"all integers in coll_choices must be distinct"
for choice in coll_choices:
assert 0 <= choice <= coll_divs, "coll_choices must contain only integers from 0 to coll_divs"
coll_choices = sorted(coll_choices)
if (coll_divs, coll_choices) not in _fast_settings_VIE_2:
raise RuntimeError(
f"Collocation setting (coll_divs={coll_divs}, coll_choices={coll_choices}) "
f"not supported by D extension.")
k_c = np.ascontiguousarray(kernel_values_, dtype=np.float64)
g_c = np.ascontiguousarray(g_values_, dtype=np.float64)
N_used = len(k_c)
mesh_divs = (N_used - 1) // coll_divs**2
soln_vals, poly_coefs = _dlang_module.solve_vie2_vec_d(
g_c, k_c, time_step, coll_divs, coll_choices, return_polys)
if return_polys:
return (soln_vals, _build_vec_polys(poly_coefs, mesh_divs, coll_divs, time_step))
return soln_vals
# ------------------------------------------------------------------ scalar path
assert len(kernel_values_.shape) == 1, "kernel_values must be a 1-dim array"
if g_values is not None:
g_values_ = np.asarray(g_values, dtype=float)
assert len(g_values_.shape) == 1, "g_values must be a 1-dim array"
assert len(g_values_) == N_orig, "kernel_values and g_values must have the same length"
g_values_ = g_values_[:N]
else:
g_values_ = np.zeros(N)
assert coll_divs > 0, "coll_divs must be a positive integer"
assert all([isinstance(choice, int) for choice in coll_choices]), \
"coll_choices must be a list of integers"
assert all([coll_choices.count(c) <= 1 for c in coll_choices]), \
"all integers in coll_choices must be distinct"
for choice in coll_choices:
assert 0 <= choice <= coll_divs, "coll_choices must contain only integers from 0 to coll_divs"
coll_choices = sorted(coll_choices)
if (coll_divs, coll_choices) in _fast_settings_VIE_2:
soln_vals, poly_coefs = _dlang_module.solve_vie2_d(
g_values_, kernel_values_, time_step, coll_divs, coll_choices, return_polys)
elif _numba_available:
if show_warnings:
print("warning: falling back to slower python/numba code")
soln_vals, poly_coefs = _numba_solvers.solve_VIE_2_jit(
g_values_, kernel_values_, time_step, coll_divs, coll_choices, return_polys)
else:
raise NotImplementedError(
f"Collocation setting (coll_divs={coll_divs}, coll_choices={coll_choices}) is not "
f"supported by the D extension. Install numba to enable the fallback solver, or "
f"use a supported setting (see fast_coll_settings_VIE_2)."
)
if return_polys:
polys = []
for i, coefs in enumerate(poly_coefs):
domain = (i * coll_divs**2 * time_step, (i+1) * coll_divs**2 * time_step)
poly = np.polynomial.Polynomial(coefs, domain=domain, window=(0.0, 1.0), symbol='t')
poly = poly.convert(domain=domain, window=domain)
polys.append(poly.trim())
return (soln_vals, polys)
else:
return soln_vals
|