Source code for sporco.dictlrn.onlinecdl

# -*- coding: utf-8 -*-
# Copyright (C) 2018-2019 by Cristina Garcia-Cardona <cgarciac@lanl.gov>
#                            Brendt Wohlberg <brendt@ieee.org>
# All rights reserved. BSD 3-clause License.
# This file is part of the SPORCO package. Details of the copyright
# and user license can be found in the 'LICENSE.txt' file distributed
# with the package.

"""Online dictionary learning based on CBPDN sparse coding"""

from __future__ import print_function, absolute_import

import copy
import numpy as np

from sporco import common
from sporco.admm import cbpdn
from sporco import cuda
from sporco.util import u, Timer
from sporco.linalg import inner
from sporco.cnvrep import (DictionarySize, stdformD, Pcn, getPcn,
                           CDU_ConvRepIndexing, zpad, mskWshape)
from sporco.dictlrn import dictlrn
from sporco.array import transpose_ntpl_list
from sporco.fft import rfftn, irfftn, byte_aligned, empty_aligned


__author__ = """\n""".join(['Cristina Garcia-Cardona <cgarciac@lanl.gov>',
                            'Brendt Wohlberg <brendt@ieee.org>'])



[docs] class OnlineConvBPDNDictLearn(common.IterativeSolver): r""" Stochastic gradient descent (SGD) based online convolutional dictionary learning, as proposed in :cite:`liu-2018-first`. | .. inheritance-diagram:: OnlineConvBPDNDictLearn :parts: 2 | """
[docs] class Options(dictlrn.DictLearn.Options): r"""Online CBPDN dictionary learning algorithm options. Options: ``Verbose`` : Flag determining whether iteration status is displayed. ``StatusHeader`` : Flag determining whether status header and separator are displayed. ``IterTimer`` : Label of the timer to use for iteration times. ``DictSize`` : Dictionary size vector. ``DataType`` : Specify data type for solution variables, e.g. ``np.float32``. ``ZeroMean`` : Flag indicating whether the solution dictionary :math:`\{\mathbf{d}_m\}` should have zero-mean components. ``eta_a``, ``eta_b`` : Constants :math:`a` and :math:`b` used in setting the SGD step size, :math:`\eta`, which is set to :math:`a / (b + i)` where :math:`i` is the iteration index. See Sec. 3 (pg. 9) of :cite:`liu-2018-first`. ``CUDA_CBPDN`` : Flag indicating whether to use CUDA solver for CBPDN problem (see :ref:`cuda_package`) ``CBPDN`` : Options :class:`.admm.cbpdn.ConvBPDN.Options`. """ defaults = {'Verbose': False, 'StatusHeader': True, 'IterTimer': 'solve', 'DictSize': None, 'DataType': None, 'ZeroMean': False, 'eta_a': 10.0, 'eta_b': 5.0, 'CUDA_CBPDN' : False, 'CBPDN': copy.deepcopy(cbpdn.ConvBPDN.Options.defaults)} def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) OnlineConvBPDNDictLearn algorithm options """ dictlrn.DictLearn.Options.__init__(self, { 'CBPDN': cbpdn.ConvBPDN.Options({ 'MaxMainIter': 100, 'AutoRho': {'Period': 10, 'AutoScaling': False, 'RsdlRatio': 10.0, 'Scaling': 2.0, 'RsdlTarget': 1.0}}) }) if opt is None: opt = {} self.update(opt)
fwiter = 4 """Field width for iteration count display column""" fpothr = 2 """Field precision for other display columns""" itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1') """Fields in IterationStats associated with the objective function""" itstat_fields_alg = ('PrimalRsdl', 'DualRsdl', 'Rho', 'Cnstr', 'DeltaD', 'Eta') """Fields in IterationStats associated with the specific solver algorithm""" itstat_fields_extra = () """Non-standard fields in IterationStats; see :meth:`itstat_extra`""" def __new__(cls, *args, **kwargs): """Create an OnlineConvBPDNDictLearn object and start its initialisation timer.""" instance = super(OnlineConvBPDNDictLearn, cls).__new__(cls) instance.timer = Timer(['init', 'solve', 'solve_wo_eval']) instance.timer.start('init') return instance def __init__(self, D0, lmbda=None, opt=None, dimK=None, dimN=2): """ Parameters ---------- D0 : array_like Initial dictionary array lmbda : float Regularisation parameter opt : :class:`OnlineConvBPDNDictLearn.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of signal dimensions in signal array passed to :meth:`solve`. If there will only be a single input signal (e.g. if `S` is a 2D array representing a single image) `dimK` must be set to 0. dimN : int, optional (default 2) Number of spatial/temporal dimensions """ if opt is None: opt = OnlineConvBPDNDictLearn.Options() if not isinstance(opt, OnlineConvBPDNDictLearn.Options): raise TypeError('Parameter opt must be an instance of ' 'OnlineConvBPDNDictLearn.Options') self.opt = opt if dimN != 2 and opt['CUDA_CBPDN']: raise ValueError('CUDA CBPDN solver can only be used when dimN=2') if opt['CUDA_CBPDN'] and cuda.device_count() == 0: raise ValueError('SPORCO-CUDA not installed or no GPU available') self.dimK = dimK self.dimN = dimN # DataType option overrides data type inferred from __init__ # parameters of derived class self.set_dtype(opt, D0.dtype) # Initialise attributes representing algorithm parameter self.lmbda = lmbda self.eta_a = opt['eta_a'] self.eta_b = opt['eta_b'] self.set_attr('eta', opt['eta_a'] / opt['eta_b'], dval=2.0, dtype=self.dtype) # Get dictionary size if self.opt['DictSize'] is None: self.dsz = D0.shape else: self.dsz = self.opt['DictSize'] # Construct object representing problem dimensions self.cri = None # Normalise dictionary ds = DictionarySize(self.dsz, dimN) dimCd = ds.ndim - dimN - 1 D0 = stdformD(D0, ds.nchn, ds.nflt, dimN).astype(self.dtype) self.D = Pcn(D0, self.dsz, (), dimN, dimCd, crp=True, zm=opt['ZeroMean']) self.Dprv = self.D.copy() # Create constraint set projection function self.Pcn = getPcn(self.dsz, (), dimN, dimCd, crp=True, zm=opt['ZeroMean']) # Initalise iterations stats list and iteration index self.itstat = [] self.j = 0 # Configure status display self.display_config()
[docs] def solve(self, S, dimK=None): """Compute sparse coding and dictionary update for training data `S`.""" # Use dimK specified in __init__ as default if dimK is None and self.dimK is not None: dimK = self.dimK # Start solve timer self.timer.start(['solve', 'solve_wo_eval']) # Solve CSC problem on S and do dictionary step self.init_vars(S, dimK) self.xstep(S, self.lmbda, dimK) self.dstep() # Stop solve timer self.timer.stop('solve_wo_eval') # Extract and record iteration stats self.manage_itstat() # Increment iteration count self.j += 1 # Stop solve timer self.timer.stop('solve') # Return current dictionary return self.getdict()
[docs] def init_vars(self, S, dimK): """Initalise variables required for sparse coding and dictionary update for training data `S`.""" Nv = S.shape[0:self.dimN] if self.cri is None or Nv != self.cri.Nv: self.cri = CDU_ConvRepIndexing(self.dsz, S, dimK, self.dimN) if self.opt['CUDA_CBPDN']: if self.cri.Cd > 1 or self.cri.Cx > 1: raise ValueError('CUDA CBPDN solver can only be used for ' 'single channel problems') if self.cri.K > 1: raise ValueError('CUDA CBPDN solver can not be used with ' 'mini-batches') self.Df = byte_aligned(rfftn(self.D, self.cri.Nv, self.cri.axisN)) self.Gf = empty_aligned(self.Df.shape, self.Df.dtype) self.Z = empty_aligned(self.cri.shpX, self.dtype) else: self.Df[:] = rfftn(self.D, self.cri.Nv, self.cri.axisN)
[docs] def xstep(self, S, lmbda, dimK): """Solve CSC problem for training data `S`.""" if self.opt['CUDA_CBPDN']: Z = cuda.cbpdn(self.D.squeeze(), S[..., 0], lmbda, self.opt['CBPDN']) Z = Z.reshape(self.cri.Nv + (1, 1, self.cri.M,)) self.Z[:] = np.asarray(Z, dtype=self.dtype) self.Zf = rfftn(self.Z, self.cri.Nv, self.cri.axisN) self.Sf = rfftn(S.reshape(self.cri.shpS), self.cri.Nv, self.cri.axisN) self.xstep_itstat = None else: # Create X update object (external representation is expected!) xstep = cbpdn.ConvBPDN(self.D.squeeze(), S, lmbda, self.opt['CBPDN'], dimK=dimK, dimN=self.cri.dimN) xstep.solve() self.Sf = xstep.Sf self.setcoef(xstep.getcoef()) self.xstep_itstat = xstep.itstat[-1] if xstep.itstat else None
[docs] def setcoef(self, Z): """Set coefficient array.""" # If the dictionary has a single channel but the input (and # therefore also the coefficient map array) has multiple # channels, the channel index and multiple image index have # the same behaviour in the dictionary update equation: the # simplest way to handle this is to just reshape so that the # channels also appear on the multiple image index. if self.cri.Cd == 1 and self.cri.C > 1: Z = Z.reshape(self.cri.Nv + (1,) + (self.cri.Cx*self.cri.K,) + (self.cri.M,)) if Z.shape != self.Z.shape: self.Z = empty_aligned(Z.shape, self.dtype) self.Z[:] = np.asarray(Z, dtype=self.dtype) self.Zf = rfftn(self.Z, self.cri.Nv, self.cri.axisN)
[docs] def dstep(self): """Compute dictionary update for training data of preceding :meth:`xstep`. """ # Compute X D - S Ryf = inner(self.Zf, self.Df, axis=self.cri.axisM) - self.Sf # Compute gradient gradf = inner(np.conj(self.Zf), Ryf, axis=self.cri.axisK) # If multiple channel signal, single channel dictionary if self.cri.C > 1 and self.cri.Cd == 1: gradf = np.sum(gradf, axis=self.cri.axisC, keepdims=True) # Update gradient step self.eta = self.eta_a / (self.j + self.eta_b) # Compute gradient descent self.Gf[:] = self.Df - self.eta * gradf self.G = irfftn(self.Gf, self.cri.Nv, self.cri.axisN) # Eval proximal operator self.Dprv[:] = self.D self.D[:] = self.Pcn(self.G)
[docs] def manage_itstat(self): """Compute, record, and display iteration statistics.""" # Extract and record iteration stats itst = self.iteration_stats() self.itstat.append(itst) self.display_status(self.fmtstr, itst)
[docs] def getdict(self): """Get final dictionary.""" return self.D
[docs] def itstat_extra(self): """Non-standard entries for the iteration stats record tuple.""" return ()
[docs] @classmethod def hdrtxt(cls): """Construct tuple of status display column title.""" return ('Itn', 'X r', 'X s', u('X ρ'), 'D cnstr', 'D dlt', u('D η'))
[docs] @classmethod def hdrval(cls): """Construct dictionary mapping display column title to IterationStats entries. """ hdrmap = {'Itn': 'Iter', 'X r': 'PrimalRsdl', 'X s': 'DualRsdl', u('X ρ'): 'Rho', 'D cnstr': 'Cnstr', 'D dlt': 'DeltaD', u('D η'): 'Eta'} return hdrmap
[docs] def iteration_stats(self): """Construct iteration stats record tuple.""" tk = self.timer.elapsed(self.opt['IterTimer']) if self.xstep_itstat is None: objfn = (0.0,) * 3 rsdl = (0.0,) * 2 rho = (0.0,) else: objfn = (self.xstep_itstat.ObjFun, self.xstep_itstat.DFid, self.xstep_itstat.RegL1) rsdl = (self.xstep_itstat.PrimalRsdl, self.xstep_itstat.DualRsdl) rho = (self.xstep_itstat.Rho,) cnstr = np.linalg.norm(zpad(self.D, self.cri.Nv) - self.G) dltd = np.linalg.norm(self.D - self.Dprv) tpl = (self.j,) + objfn + rsdl + rho + (cnstr, dltd, self.eta) + \ self.itstat_extra() + (tk,) return type(self).IterationStats(*tpl)
[docs] def getitstat(self): """Get iteration stats as named tuple of arrays instead of array of named tuples. """ return transpose_ntpl_list(self.itstat)
[docs] def display_config(self): """Set up status display if option selected. NB: this method assumes that the first entry is the iteration count and the last is the rho value. """ if self.opt['Verbose']: hdrtxt = type(self).hdrtxt() # Call utility function to construct status display formatting self.hdrstr, self.fmtstr, self.nsep = common.solve_status_str( hdrtxt, fwdth0=type(self).fwiter, fprec=type(self).fpothr) else: self.hdrstr, self.fmtstr, self.nsep = '', '', 0
[docs] def display_start(self): """Start status display if option selected.""" if self.opt['Verbose'] and self.opt['StatusHeader']: print(self.hdrstr) print("-" * self.nsep)
[docs] def display_status(self, fmtstr, itst): """Display current iteration status as selection of fields from iteration stats tuple. """ if self.opt['Verbose']: hdrtxt = type(self).hdrtxt() hdrval = type(self).hdrval() itdsp = tuple([getattr(itst, hdrval[col]) for col in hdrtxt]) print(fmtstr % itdsp)
[docs] def display_end(self): """Terminate status display if option selected.""" if self.opt['Verbose'] and self.opt['StatusHeader']: print("-" * self.nsep)
[docs] class OnlineConvBPDNMaskDictLearn(OnlineConvBPDNDictLearn): r""" Stochastic gradient descent (SGD) based online convolutional dictionary learning with a spatial mask, as proposed in :cite:`liu-2018-first`. | .. inheritance-diagram:: OnlineConvBPDNMaskDictLearn :parts: 2 | """
[docs] class Options(OnlineConvBPDNDictLearn.Options): r"""Online masked CBPDN dictionary learning algorithm options. Options are the same as those of :class:`OnlineConvBPDNDictLearn.Options`, except for ``CBPDN`` : Options :class:`.admm.cbpdn.ConvBPDNMaskDcpl.Options`. """ defaults = copy.deepcopy(OnlineConvBPDNDictLearn.Options.defaults) defaults.update({'CBPDN': copy.deepcopy( cbpdn.ConvBPDNMaskDcpl.Options.defaults)}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) OnlineConvBPDNMaskDictLearn algorithm options """ OnlineConvBPDNDictLearn.Options.__init__(self, { 'CBPDN': cbpdn.ConvBPDNMaskDcpl.Options({ 'AutoRho': {'Period': 10, 'AutoScaling': False, 'RsdlRatio': 10.0, 'Scaling': 2.0, 'RsdlTarget': 1.0}}) }) if opt is None: opt = {} self.update(opt)
[docs] def solve(self, S, W=None, dimK=None): """Compute sparse coding and dictionary update for training data `S`.""" # Use dimK specified in __init__ as default if dimK is None and self.dimK is not None: dimK = self.dimK # Start solve timer self.timer.start(['solve', 'solve_wo_eval']) # Solve CSC problem on S and do dictionary step self.init_vars(S, dimK) if W is None: W = np.array([1.0], dtype=self.dtype) W = np.asarray(W.reshape(mskWshape(W, self.cri)), dtype=self.dtype) self.xstep(S, W, self.lmbda, dimK) self.dstep(W) # Stop solve timer self.timer.stop('solve_wo_eval') # Extract and record iteration stats self.manage_itstat() # Increment iteration count self.j += 1 # Stop solve timer self.timer.stop('solve') # Return current dictionary return self.getdict()
[docs] def xstep(self, S, W, lmbda, dimK): """Solve CSC problem for training data `S`.""" if self.opt['CUDA_CBPDN']: Z = cuda.cbpdnmsk(self.D.squeeze(), S[..., 0], W.squeeze(), lmbda, self.opt['CBPDN']) Z = Z.reshape(self.cri.Nv + (1, 1, self.cri.M,)) self.Z[:] = np.asarray(Z, dtype=self.dtype) self.Zf = rfftn(self.Z, self.cri.Nv, self.cri.axisN) self.Sf = rfftn(S.reshape(self.cri.shpS), self.cri.Nv, self.cri.axisN) self.xstep_itstat = None else: # Create X update object (external representation is expected!) xstep = cbpdn.ConvBPDNMaskDcpl(self.D.squeeze(), S, lmbda, W, self.opt['CBPDN'], dimK=dimK, dimN=self.cri.dimN) xstep.solve() self.Sf = rfftn(S.reshape(self.cri.shpS), self.cri.Nv, self.cri.axisN) self.setcoef(xstep.getcoef()) self.xstep_itstat = xstep.itstat[-1] if xstep.itstat else None
[docs] def dstep(self, W): """Compute dictionary update for training data of preceding :meth:`xstep`. """ # Compute residual X D - S in frequency domain Ryf = inner(self.Zf, self.Df, axis=self.cri.axisM) - self.Sf # Transform to spatial domain, apply mask, and transform back to # frequency domain Ryf[:] = rfftn(W * irfftn(Ryf, self.cri.Nv, self.cri.axisN), None, self.cri.axisN) # Compute gradient gradf = inner(np.conj(self.Zf), Ryf, axis=self.cri.axisK) # If multiple channel signal, single channel dictionary if self.cri.C > 1 and self.cri.Cd == 1: gradf = np.sum(gradf, axis=self.cri.axisC, keepdims=True) # Update gradient step self.eta = self.eta_a / (self.j + self.eta_b) # Compute gradient descent self.Gf[:] = self.Df - self.eta * gradf self.G = irfftn(self.Gf, self.cri.Nv, self.cri.axisN) # Eval proximal operator self.Dprv[:] = self.D self.D[:] = self.Pcn(self.G)