A NIFTy demonstration

IFT: Big Picture

IFT starting point:

$$d = Rs+n$$

Typically, $s$ continuous field, $d$ discrete data vector. Particularily, $R$ is not invertible.

IFT aims at inverting the above uninvertible problem in the best possible way using Bayesian statistics.

NIFTy

NIFTy (Numerical Information Field Theory, en. raffiniert) is a Python framework in which IFT problems can be tackeled easily.

Main Interfaces:

  • Spaces: Cartesian, 2-Spheres (Healpix, Gauss-Legendre) and their respective harmonic spaces.
  • Fields: Defined on spaces.
  • Operators: Acting on spaces.

Wiener Filter: Formulae

Assumptions

  • $d=Rs+n$, $R$ linear operator.
  • $\mathcal P (s) = \mathcal G (s,S)$, $\mathcal P (n) = \mathcal G (n,N)$ where $S, N$ are positive definite matrices.

Posterior

The Posterior is given by:

$$\mathcal P (s|d) \propto P(s,d) = \mathcal G(d-Rs,N) \,\mathcal G(s,S) \propto \mathcal G (m,D) $$

where $$\begin{align} m &= Dj \\ D^{-1}&= (S^{-1} +R^\dagger N^{-1} R )\\ j &= R^\dagger N^{-1} d \end{align}$$

Let us implement this in NIFTy!

Wiener Filter: Example

  • One-dimensional signal with powerspectrum: $$P(k) = P_0\,\left(1+\left(\frac{k}{k_0}\right)^2\right)^{-\gamma /2},$$ with $P_0 = 0.2, k_0 = 5, \gamma = 4$. Recall: $P(k)$ defines an isotropic and homogeneous $S$.
  • $N = 0.5 \cdot \text{id}$.
  • Number data points $N_{pix} = 512$.
  • Response operator: $$R_x=\begin{pmatrix} \delta(x-0)\\\delta(x-1)\\\ldots\\ \delta(x-511) \end{pmatrix}.$$ However, the signal space is also discrete on the computer and $R = \text{id}$.
In [27]:
N_pixels = 512     # Number of pixels
sigma2 = .5        # Noise variance

def pow_spec(k):
    P0, k0, gamma = [.2, 5, 6]
    return P0 * (1. + (k/k0)**2)**(- gamma / 2)

Wiener Filter: Implementation

Import Modules

In [28]:
import matplotlib.pyplot as plt
import numpy as np
from nifty import (DiagonalOperator, EndomorphicOperator, FFTOperator, Field,
                   InvertibleOperatorMixin, PowerSpace, RGSpace,
                   create_power_operator, SmoothingOperator, DiagonalProberMixin, Prober)

Implement Propagator

In [29]:
class PropagatorOperator(InvertibleOperatorMixin, EndomorphicOperator):
    def __init__(self, R, N, Sh, default_spaces=None):
        super(PropagatorOperator, self).__init__(default_spaces=default_spaces,
                                                 preconditioner=lambda x : fft.adjoint_times(Sh.times(fft.times(x))))

        self.R = R
        self.N = N
        self.Sh = Sh
        self._domain = R.domain
        self.fft = FFTOperator(domain=R.domain, target=Sh.domain)

    def _inverse_times(self, x, spaces, x0=None):
        return self.R.adjoint_times(self.N.inverse_times(self.R(x))) \
               + self.fft.adjoint_times(self.Sh.inverse_times(self.fft(x)))

    @property
    def domain(self):
        return self._domain

    @property
    def unitary(self):
        return False

    @property
    def symmetric(self):
        return False

    @property
    def self_adjoint(self):
        return True

Conjugate Gradient Preconditioning

  • $D$ is defined via: $$D^{-1} = \mathcal F^\dagger S_h^{-1}\mathcal F + R^\dagger N^{-1} R.$$ In the end, we want to apply $D$ to $j$, i.e. we need the inverse action of $D^{-1}$. This is done numerically (algorithm: Conjugate Gradient).

  • One can define the condition number of a non-singular and normal matrix $A$: $$\kappa (A) := \frac{|\lambda_{\text{max}}|}{|\lambda_{\text{min}}|},$$ where $\lambda_{\text{max}}$ and $\lambda_{\text{min}}$ are the largest and smallest eigenvalue of $A$, respectively.

  • The larger $\kappa$ the slower Conjugate Gradient.

  • By default, conjugate gradient solves: $D^{-1} m = j$ for $m$, where $D^{-1}$ can be bad conditioned. If one knows a non-singular matrix $T$ for which $TD^{-1}$ is better conditioned, one can solve the equivalent problem: $$\tilde A m = \tilde j,$$ where $\tilde A = T D^{-1}$ and $\tilde j = Tj$.

  • In our case $S^{-1}$ is responsible for the bad conditioning of $D$ depending on the chosen power spectrum. Thus, we choose

$$T = \mathcal F^\dagger S_h^{-1} \mathcal F.$$

Generate Mock data

  • Generate a field $s$ and $n$ with given covariances.
  • Calculate $d$.
In [30]:
s_space = RGSpace(N_pixels)
fft = FFTOperator(s_space)
h_space = fft.target[0]
p_space = PowerSpace(h_space)


# Operators
Sh = create_power_operator(h_space, power_spectrum=pow_spec)
N = DiagonalOperator(s_space, diagonal=sigma2, bare=True)
R = DiagonalOperator(s_space, diagonal=1.)
D = PropagatorOperator(R=R, N=N, Sh=Sh)

# Fields and data
sh = Field(p_space, val=pow_spec).power_synthesize(real_signal=True)
s = fft.adjoint_times(sh)
n = Field.from_random(domain=s_space, random_type='normal',
                      std=np.sqrt(sigma2), mean=0)
d = R(s) + n
j = R.adjoint_times(N.inverse_times(d))

Run Wiener Filter

In [32]:
m = D(j)

Create Power Spectra of Signal and Reconstruction

In [33]:
s_power = sh.power_analyze()
m_power = fft(m).power_analyze()
s_power_data = s_power.val.get_full_data().real
m_power_data = m_power.val.get_full_data().real

# Get signal data and reconstruction data
s_data = s.val.get_full_data().real
m_data = m.val.get_full_data().real

d_data = d.val.get_full_data().real

Signal Reconstruction

In [34]:
plt.plot(s_data, 'k', label="Signal", alpha=.5, linewidth=.5)
plt.plot(d_data, 'k+', label="Data")
plt.plot(m_data, 'r', label="Reconstruction")
plt.title("Reconstruction")
plt.legend()
plt.show()
In [35]:
plt.figure()
plt.plot(s_data - s_data, 'k', label="Signal", alpha=.5, linewidth=.5)
plt.plot(d_data - s_data, 'k+', label="Data")
plt.plot(m_data - s_data, 'r', label="Reconstruction")
plt.axhspan(-np.sqrt(sigma2),np.sqrt(sigma2), facecolor='0.9', alpha=.5)
plt.title("Residuals")
plt.legend()
plt.show()

Power Spectrum

In [36]:
plt.loglog()
plt.xlim(1, int(N_pixels/2))
ymin = min(m_power_data)
plt.ylim(ymin, 1)
xs = np.arange(1,int(N_pixels/2),.1)
plt.plot(xs, pow_spec(xs), label="True Power Spectrum", linewidth=.7, color='k')
plt.plot(s_power_data, 'k', label="Signal", alpha=.5, linewidth=.5)
plt.plot(m_power_data, 'r', label="Reconstruction")
plt.axhline(sigma2 / N_pixels, color="k", linestyle='--', label="Noise level", alpha=.5)
plt.axhspan(sigma2 / N_pixels, ymin, facecolor='0.9', alpha=.5)
plt.title("Power Spectrum")
plt.legend()
plt.show()

Wiener Filter on Incomplete Data

In [37]:
# Operators
Sh = create_power_operator(h_space, power_spectrum=pow_spec)
N = DiagonalOperator(s_space, diagonal=sigma2, bare=True)
# R is defined below

# Fields
sh = Field(p_space, val=pow_spec).power_synthesize(real_signal=True)
s = fft.adjoint_times(sh)
n = Field.from_random(domain=s_space, random_type='normal',
                      std=np.sqrt(sigma2), mean=0)

Partially Lose Data

In [38]:
l = int(N_pixels * 0.2)
h = int(N_pixels * 0.2 * 4)

mask = Field(s_space, val=1)
mask.val[ l : h] = 0

R = DiagonalOperator(s_space, diagonal = mask)
n.val[l:h] = 0

d = R(s) + n
In [39]:
D = PropagatorOperator(R=R, N=N, Sh=Sh)
j = R.adjoint_times(N.inverse_times(d))
m = D(j)

Compute Uncertainty

In [40]:
class DiagonalProber(DiagonalProberMixin, Prober):
    def __init__(self, *args, **kwargs):
        super(DiagonalProber,self).__init__(*args, **kwargs)

diagProber = DiagonalProber(domain=s_space, probe_dtype=np.complex, probe_count=200)
diagProber(D)
m_var = Field(s_space,val=diagProber.diagonal.val).weight(-1)

Get data

In [41]:
s_power = sh.power_analyze()
m_power = fft(m).power_analyze()
s_power_data = s_power.val.get_full_data().real
m_power_data = m_power.val.get_full_data().real

# Get signal data and reconstruction data
s_data = s.val.get_full_data().real
m_data = m.val.get_full_data().real
m_var_data = m_var.val.get_full_data().real
uncertainty = np.sqrt(np.abs(m_var_data))

d_data = d.val.get_full_data().real

# Set lost data to NaN for proper plotting
d_data[d_data == 0] = np.nan
In [42]:
fig = plt.figure(figsize=(15,10))
plt.plot(s_data, 'k', label="Signal", alpha=.5, linewidth=1)
plt.plot(d_data, 'k+', label="Data", alpha=1)
plt.axvspan(l, h, facecolor='0.8', alpha=.5)
plt.title("Incomplete Data")
plt.legend()
Out[42]:
<matplotlib.legend.Legend at 0x7f4cf43f6b50>
In [43]:
fig
Out[43]:
In [44]:
fig = plt.figure(figsize=(15,10))
plt.plot(s_data, 'k', label="Signal", alpha=1, linewidth=1)
plt.plot(d_data, 'k+', label="Data", alpha=.5)
plt.plot(m_data, 'r', label="Reconstruction")
plt.axvspan(l, h, facecolor='0.8', alpha=.5)
plt.fill_between(range(N_pixels), m_data - uncertainty, m_data + uncertainty, facecolor='0')
plt.title("Reconstruction of incomplete data")
plt.legend()
Out[44]:
<matplotlib.legend.Legend at 0x7f4cf42075d0>
In [45]:
fig
Out[45]:

2d Example

In [46]:
N_pixels = 256      # Number of pixels
sigma2 = 1000        # Noise variance


def pow_spec(k):
    P0, k0, gamma = [.2, 20, 4]
    return P0 * (1. + (k/k0)**2)**(- gamma / 2)


s_space = RGSpace([N_pixels, N_pixels])
In [52]:
fft = FFTOperator(s_space)
h_space = fft.target[0]
p_space = PowerSpace(h_space)

# Operators
Sh = create_power_operator(h_space, power_spectrum=pow_spec)
N = DiagonalOperator(s_space, diagonal=sigma2, bare=True)
R = SmoothingOperator(s_space, sigma=.01)
D = PropagatorOperator(R=R, N=N, Sh=Sh)

# Fields and data
sh = Field(p_space, val=pow_spec).power_synthesize(real_signal=True)
s = fft.adjoint_times(sh)
n = Field.from_random(domain=s_space, random_type='normal',
                      std=np.sqrt(sigma2), mean=0)

# Lose some data

l = int(N_pixels * 0.2)
h = int(N_pixels * 0.2 * 2)

mask = Field(s_space, val=1)
mask.val[l:h,l:h] = 0

R = DiagonalOperator(s_space, diagonal = mask)
n.val[l:h, l:h] = 0
D = PropagatorOperator(R=R, N=N, Sh=Sh)

d = R(s) + n
j = R.adjoint_times(N.inverse_times(d))

# Run Wiener filter
m = D(j)

# Uncertainty
diagProber = DiagonalProber(domain=s_space, probe_dtype=np.complex, probe_count=10)
diagProber(D)
m_var = Field(s_space, val=diagProber.diagonal.val).weight(-1)

# Get data
s_power = sh.power_analyze()
m_power = fft(m).power_analyze()
s_power_data = s_power.val.get_full_data().real
m_power_data = m_power.val.get_full_data().real
s_data = s.val.get_full_data().real
m_data = m.val.get_full_data().real
m_var_data = m_var.val.get_full_data().real
d_data = d.val.get_full_data().real

uncertainty = np.sqrt(np.abs(m_var_data))
In [53]:
cm = ['magma', 'inferno', 'plasma', 'viridis'][1]

mi = np.min(s_data)
ma = np.max(s_data)

fig, axes = plt.subplots(1, 2, figsize=(15, 7))

data = [s_data, d_data]
caption = ["Signal", "Data"]

for ax in axes.flat:
    im = ax.imshow(data.pop(0), interpolation='nearest', cmap=cm, vmin=mi,
                   vmax=ma)
    ax.set_title(caption.pop(0))

fig.subplots_adjust(right=0.8)
cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7])
fig.colorbar(im, cax=cbar_ax)
Out[53]:
<matplotlib.colorbar.Colorbar at 0x7f4cf27d0650>
In [54]:
fig
Out[54]: