import casadi as ca
import torchimport torch.nn.functional as Fimport numpy as npclass Net(torch.nn.Module):
def __init__(self, n_feature, n_hidden, n_output):
super(Net, self).__init__()
self.input = torch.nn.Linear(n_feature, n_hidden) # hidden layer
self.hidden = torch.nn.Linear(n_hidden, n_hidden) # hidden layer
self.predict = torch.nn.Linear(n_hidden, n_output) # output layer
self.device = torch.device("cpu")
self.dtype = torch.float
self.n_feature = n_feature
self.n_output = n_output
def forward(self, x):
# activation function for hidden layer
x = F.relu(self.input(x))
x = F.relu(self.hidden(x))
# linear output, here r should be the output
r = self.predict(x)
return r
net = Net(n_feature=2, n_hidden=10, n_output=1)
class NetCallback(ca.Callback):
def __init__(self, name, net, opts={}):
ca.Callback.__init__(self)
self.net = net
self.construct(name, opts)
def get_n_in(self): return 1
def get_n_out(self): return 1
def get_sparsity_in(self, i):
return ca.Sparsity.dense(self.net.n_feature, 1)
def get_sparsity_out(self, i):
return ca.Sparsity.dense(self.net.n_output, 1)
def eval(self, arg):
arg0 = torch.tensor(np.array(arg[0]).T, device=self.net.device, dtype=self.net.dtype)
return [ca.DM(self.net(arg0).detach().numpy())]
w = ca.MX.sym('w', 2)
casadi_net = NetCallback('test', net, {"enable_fd": True})
prob = {'f': casadi_net(w), 'x': w, }
options = {"ipopt": {"hessian_approximation": "limited-memory"}}
solver = ca.nlpsol('solver', 'ipopt', prob, options)
sol = solver(x0=ca.DM([1, 3]))
w_opt = sol['x'].full().flatten()
def get_reverse(self, nadj, name, inames, onames, opts):
self.net.forward()
adj_seed = [torch.Tensor] #???? here I am stuck how I should define the seeds
nominal_in = self.mx_in()
nominal_out = self.mx_out()
adj_seed = self.mx_out()
return ca.Function(name, nominal_in + nominal_out + adj_seed, ca.callback.call(nominal_in + adj_seed),
inames, onames)
The trick is to return an instance of another Callback.
It should be fairly easy to create an abstraction on this whole Calback thing were you simple provide numerical evaluation and numerical derivatives.
I'm open to syntax suggestions..
Best regards,
Joris
I see, like in the tensorflow case. Ok I'll have a look and let you know tomorrow.
Thanks!
class PytorchEvaluator(ca.Callback):
def __init__(self, t_in, t_out, opts={}):
"""
t_in: list of inputs (pytorch tensors)
t_out: list of outputs (pytorch tensors)
"""
ca.casadi.Callback.__init__(self)
assert isinstance(t_in, list)
self.t_in = t_in
assert isinstance(t_out, list)
self.t_out = t_out
self.construct("PytorchEvaluator", opts)
self.refs = []
def get_n_in(self): return len(self.t_in)
def get_n_out(self): return len(self.t_out)
def get_sparsity_in(self, i):
return ca.Sparsity.dense(*list(self.t_in[i].size()))
def get_sparsity_out(self, i):
return ca.Sparsity.dense(*list(self.t_out[i].size()))
def eval(self, arg):
# arg0 = torch.tensor(np.array(arg[0]).T, device=torch.device('cpu'), dtype=torch.float)
return [ca.DM(arg0.detach().numpy()) for arg0 in self.t_out]
# Vanilla tensorflow offers just the reverse mode AD
def has_reverse(self, nadj): return nadj == 1
def get_reverse(self, nadj, name, inames, onames, opts):
# Construct tensorflow placeholders for the reverse seeds
adj_seed = [torch.ones(self.sparsity_out(i).shape[0], dtype=torch.float, device=torch.device("cpu"),
requires_grad=True) for i in
range(self.n_out())]
# Create another TensorFlowEvaluator object
for i, t_out in enumerate(self.t_out):
t_out.backward(torch.ones(self.t_in[i].size()))
out = [t_in.grad for t_in in self.t_in]
# callback = PytorchEvaluator(self.t_in + adj_seed, self.t_out.backward())
callback = PytorchEvaluator(self.t_in + adj_seed, out)
# Make sure you keep a reference to it
self.refs.append(callback)
# Package it in the nominal_in+nominal_out+adj_seed form that CasADi expects
nominal_in = self.mx_in()
nominal_out = self.mx_out()
adj_seed = self.mx_out()
return ca.Function(name, nominal_in + nominal_out + adj_seed, callback.call(nominal_in + adj_seed),
inames, onames)
# return ca.Function(name, nominal_in, callback.call(nominal_in))
x = torch.tensor([4], dtype=torch.float, device=torch.device('cpu'), requires_grad=True)
y = (x - 2) ** 2
evaluator = PytorchEvaluator([x], [y])
w = ca.MX.sym('w')
prob = {'f': evaluator.call([w])[0], 'x': w, }
solver = ca.nlpsol('solver', 'ipopt', prob)
sol = solver(x0=ca.DM([1]))
print(sol['x'])
ca.Function(name, nominal_in + nominal_out + adj_seed, callback.call(nominal_in + adj_seed),
inames, onames)
I suspect that you could get an infinitely differentiable PyTorchEvaluator too with a bit more effort.
Best,
Joris
Get a function that calculates nadj adjoint derivatives.