Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions clustpy/deep/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def encode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: to
if type(embedded_data) is tuple:
embedded_data = embedded_data[0]
if embeddings_numpy is None:
embeddings_numpy = np.zeros((len(dataloader.dataset), embedded_data.shape[1]), dtype=float)
embeddings_numpy = np.zeros([len(dataloader.dataset)] + list(embedded_data.shape[1:]), dtype=float)
embeddings_numpy[batch[0]] = embedded_data.detach().cpu().numpy()
return embeddings_numpy

Expand Down Expand Up @@ -218,7 +218,7 @@ def decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: to
else:
decoded_data = neural_network.decode(embedded_data)
if decodings_numpy is None:
decodings_numpy = np.zeros((len(dataloader.dataset), decoded_data.shape[1]), dtype=float)
decodings_numpy = np.zeros([len(dataloader.dataset)] + list(decoded_data.shape[1:]), dtype=float)
decodings_numpy[batch[0]] = decoded_data.detach().cpu().numpy()
return decodings_numpy

Expand Down Expand Up @@ -255,8 +255,8 @@ def encode_decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_netw
else:
decoded_data = neural_network.decode(embedded_data)
if embeddings_numpy is None:
embeddings_numpy = np.zeros((len(dataloader.dataset), embedded_data.shape[1]), dtype=float)
decodings_numpy = np.zeros((len(dataloader.dataset), decoded_data.shape[1]), dtype=float)
embeddings_numpy = np.zeros([len(dataloader.dataset)] + list(embedded_data.shape[1:]), dtype=float)
decodings_numpy = np.zeros([len(dataloader.dataset)] + list(decoded_data.shape[1:]), dtype=float)
embeddings_numpy[batch[0]] = embedded_data.detach().cpu().numpy()
decodings_numpy[batch[0]] = decoded_data.detach().cpu().numpy()
return embeddings_numpy, decodings_numpy
Expand Down
1 change: 1 addition & 0 deletions clustpy/deep/neural_networks/_abstract_autoencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def load_parameters(self, path: str | Path) -> '_AbstractAutoencoder':
this instance of the autoencoder
"""
self.load_state_dict(torch.load(path, weights_only=True, map_location=get_device_from_module(self)))
self.eval()
self.fitted = True
return self

Expand Down
35 changes: 32 additions & 3 deletions clustpy/deep/neural_networks/convolutional_autoencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,23 @@ class ConvolutionalAutoencoder(_AbstractAutoencoder):
indicates whether the autoencoder is already fitted
work_on_copy : bool
indicates whether deep clustering algorithms should work on a copy of the original autoencoder


Examples
----------
>>> from clustpy.deep.neural_networks import ConvolutionalAutoencoder
>>> from clustpy.data import load_usps
>>> from clustpy.utils import plot_image
>>> import torchvision
>>> dataset = load_usps()
>>> X = dataset.images
>>> X = X / 255.
>>> X = X.reshape(-1, 1, X.shape[1], X.shape[1])
>>> X = np.tile(X, (1, 3, 1, 1))
>>> X = torchvision.transforms.Resize((32, 32))(torch.from_numpy(X).float()).numpy()
>>> cae = ConvolutionalAutoencoder(X.shape[2], [512, 10]).fit(data=X[:500], n_epochs=100)
>>> Z = cae.decode(cae.encode(torch.from_numpy(X[0]).float())).detach().numpy()
>>> plot_image(Z, image_shape=(16, 16), min_value=0, max_value=1)

References
----------
He, Kaiming, et al. "Deep residual learning for image recognition."
Expand All @@ -87,6 +103,8 @@ def __init__(self, input_height: int, fc_layers: list, conv_encoder_name: str =
work_on_copy: bool = True, random_state: np.random.RandomState | int = None, **fc_kwargs):
super().__init__(work_on_copy, random_state)
self.allow_nd_input = True
if input_height % 32 != 0:
raise ValueError(f"Input_height has to be a multiple of 32. Your input: {input_height}")
self.input_height = input_height

# Check if layers match
Expand Down Expand Up @@ -141,8 +159,14 @@ def encode(self, x: torch.Tensor) -> torch.Tensor:
embedded : torch.Tensor
the embedded data point with dimensionality embedding_size
"""
embedded = self.conv_encoder(x)
x_adj = x.reshape(1, x.shape[0], x.shape[1] ,x.shape[2]) if x.ndim == 3 else x
if x_adj.shape[1:] != (3, self.input_height, self.input_height):
raise ValueError("Input layer of the encoder ({0}) does not match shape of the input sample ({1})".format((3, self.input_height, self.input_height),
x_adj.shape[1:]))
embedded = self.conv_encoder(x_adj)
embedded = self.fc_encoder(embedded)
if x.ndim == 3:
embedded = embedded[0]
return embedded

def decode(self, embedded: torch.Tensor) -> torch.Tensor:
Expand All @@ -159,6 +183,11 @@ def decode(self, embedded: torch.Tensor) -> torch.Tensor:
decoded : torch.Tensor
returns the reconstruction of embedded
"""
decoded = self.fc_decoder(embedded)
embedded_adj = embedded.reshape((1, -1)) if embedded.ndim == 1 else embedded
if embedded_adj.shape[1] != self.fc_decoder.layers[0]:
raise ValueError("Input layer of the decoder does not match input sample")
decoded = self.fc_decoder(embedded_adj)
decoded = self.conv_decoder(decoded)
if embedded.ndim == 1:
decoded = decoded[0]
return decoded
16 changes: 11 additions & 5 deletions clustpy/deep/neural_networks/feedforward_autoencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ def encode(self, x: torch.Tensor) -> torch.Tensor:
embedded : torch.Tensor
the embedded data point with dimensionality embedding_size
"""
if x.shape[1] != self.encoder.layers[0]:
x_adj = x.reshape((1, -1)) if x.ndim == 1 else x
if x_adj.shape[1] != self.encoder.layers[0]:
raise ValueError("Input layer of the encoder ({0}) does not match input sample ({1})".format(self.encoder.layers[0],
x.shape[1]))
embedded = self.encoder(x)
x_adj.shape[1]))
embedded = self.encoder(x_adj)
if x.ndim == 1:
embedded = embedded[0]
return embedded

def decode(self, embedded: torch.Tensor) -> torch.Tensor:
Expand All @@ -115,7 +118,10 @@ def decode(self, embedded: torch.Tensor) -> torch.Tensor:
decoded : torch.Tensor
returns the reconstruction of embedded
"""
if embedded.shape[1] != self.decoder.layers[0]:
embedded_adj = embedded.reshape((1, -1)) if embedded.ndim == 1 else embedded
if embedded_adj.shape[1] != self.decoder.layers[0]:
raise ValueError("Input layer of the decoder does not match input sample")
decoded = self.decoder(embedded)
decoded = self.decoder(embedded_adj)
if embedded.ndim == 1:
decoded = decoded[0]
return decoded
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from clustpy.deep import DCN
import torch
import numpy as np
import pytest


def test_convolutional_autoencoder_resnet18():
Expand All @@ -10,12 +11,17 @@ def test_convolutional_autoencoder_resnet18():
data_batch = torch.Tensor(data[:batch_size])
embedding_dim = 10
autoencoder = ConvolutionalAutoencoder(32, [512, embedding_dim])
autoencoder.eval()
# Test encoding
embedded = autoencoder.encode(data_batch)
assert embedded.shape == (batch_size, embedding_dim)
embedded_solo = autoencoder.encode(data_batch[0])
assert embedded_solo.shape == (embedding_dim, )
# Test decoding
decoded = autoencoder.decode(embedded)
assert decoded.shape == (batch_size, 3, 32, 32)
decoded_solo = autoencoder.decode(embedded[0])
assert decoded_solo.shape == (3, 32, 32)
# Test forwarding
forwarded = autoencoder.forward(data_batch)
assert torch.equal(decoded, forwarded)
Expand All @@ -31,12 +37,17 @@ def test_convolutional_autoencoder_resnet_50():
data_batch = torch.Tensor(data[:batch_size])
embedding_dim = 10
autoencoder = ConvolutionalAutoencoder(32, [2048, embedding_dim], conv_encoder_name="resnet50")
autoencoder.eval()
# Test encoding
embedded = autoencoder.encode(data_batch)
assert embedded.shape == (batch_size, embedding_dim)
embedded_solo = autoencoder.encode(data_batch[0])
assert embedded_solo.shape == (embedding_dim, )
# Test decoding
decoded = autoencoder.decode(embedded)
assert decoded.shape == (batch_size, 3, 32, 32)
decoded_solo = autoencoder.decode(embedded[0])
assert decoded_solo.shape == (3, 32, 32)
# Test forwarding
forwarded = autoencoder.forward(data_batch)
assert torch.equal(decoded, forwarded)
Expand All @@ -49,12 +60,17 @@ def test_mixed_convolutional_autoencoder():
embedding_dim = 10
autoencoder = ConvolutionalAutoencoder(32, [2048, embedding_dim], fc_decoder_layers=[embedding_dim, 512],
conv_encoder_name="resnet50", conv_decoder_name="resnet18")
autoencoder.eval()
# Test encoding
embedded = autoencoder.encode(data_batch)
assert embedded.shape == (batch_size, embedding_dim)
embedded_solo = autoencoder.encode(data_batch[0])
assert embedded_solo.shape == (embedding_dim, )
# Test decoding
decoded = autoencoder.decode(embedded)
assert decoded.shape == (batch_size, 3, 32, 32)
decoded_solo = autoencoder.decode(embedded[0])
assert decoded_solo.shape == (3, 32, 32)
# Test forwarding
forwarded = autoencoder.forward(data_batch)
assert torch.equal(decoded, forwarded)
Expand All @@ -71,3 +87,15 @@ def test_convolutional_autoencoder_in_deep_clustering():
assert dcn.labels_.shape == (100,)
X_embed = dcn.transform(data)
assert X_embed.shape == (data.shape[0], dcn.embedding_size)


def test_convolutional_autoencoder_errors():
with pytest.raises(ValueError):
# Wrong input height (must be 32 x X)
ConvolutionalAutoencoder(16, [512, 10])
with pytest.raises(ValueError):
# Wrong fc_layers for resnet 18
ConvolutionalAutoencoder(32, conv_encoder_name="resnet18", fc_layers=[2048, 10])
with pytest.raises(ValueError):
# Wrong fc_layers for resnet 50
ConvolutionalAutoencoder(32, conv_encoder_name="resnet50", fc_layers=[512, 10])
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ def test_feedforward_autoencoder():
# Test encoding
embedded = autoencoder.encode(data_batch)
assert embedded.shape == (batch_size, embedding_dim)
embedded_solo = autoencoder.encode(data_batch[0])
assert embedded_solo.shape == (embedding_dim, )
# Test decoding
decoded = autoencoder.decode(embedded)
assert decoded.shape == (batch_size, data.shape[1])
decoded_solo = autoencoder.decode(embedded[0])
assert decoded_solo.shape == (data.shape[1], )
# Test forwarding
forwarded = autoencoder.forward(data_batch)
assert torch.equal(decoded, forwarded)
Expand Down
29 changes: 29 additions & 0 deletions clustpy/deep/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sklearn.mixture import GaussianMixture
from clustpy.partition import XMeans
from clustpy.deep.tests._helpers_for_tests import _get_dc_test_data
from clustpy.deep.neural_networks import ConvolutionalAutoencoder


def test_mean_squared_error():
Expand Down Expand Up @@ -71,6 +72,15 @@ def test_encode_batchwise():
desired = np.sum(data, axis=1).reshape((-1, 1))
desired = np.tile(desired, embedding_size)
assert np.allclose(encoded, desired, atol=1e-5)
# Test for Conv
X_images = np.array([[[[11] * 32] * 32, [[12] * 32] * 32, [[13] * 32] * 32],
[[[10] * 32] * 32, [[20] * 32] * 32, [[30] * 32] * 32],
[[[10] * 32] * 32, [[40] * 32] * 32, [[70] * 32] * 32],
[[[1] * 32] * 32, [[1] * 32] * 32, [[1] * 32] * 32]])
dataloader_images = _get_test_dataloader(X_images, 2, False, False)
autoencoder_images = ConvolutionalAutoencoder(32, [512, 10])
encoded_images = encode_batchwise(dataloader_images, autoencoder_images)
assert encoded_images.shape == (4, 10)


def test_predict_batchwise():
Expand All @@ -95,6 +105,15 @@ def test_decode_batchwise():
autoencoder = _TestAutoencoder(data.shape[1], embedding_size)
decoded = decode_batchwise(dataloader, autoencoder)
assert data.shape == decoded.shape
# Test for Conv
X_images = np.array([[[[11] * 32] * 32, [[12] * 32] * 32, [[13] * 32] * 32],
[[[10] * 32] * 32, [[20] * 32] * 32, [[30] * 32] * 32],
[[[10] * 32] * 32, [[40] * 32] * 32, [[70] * 32] * 32],
[[[1] * 32] * 32, [[1] * 32] * 32, [[1] * 32] * 32]])
dataloader_images = _get_test_dataloader(X_images, 2, False, False)
autoencoder_images = ConvolutionalAutoencoder(32, [512, 10])
decoded_images = decode_batchwise(dataloader_images, autoencoder_images)
assert X_images.shape == decoded_images.shape


def test_encode_decode_batchwise():
Expand All @@ -109,6 +128,16 @@ def test_encode_decode_batchwise():
desired = np.tile(desired, embedding_size)
assert np.allclose(encoded, desired, atol=1e-5)
assert data.shape == decoded.shape
# Test for Conv
X_images = np.array([[[[11] * 32] * 32, [[12] * 32] * 32, [[13] * 32] * 32],
[[[10] * 32] * 32, [[20] * 32] * 32, [[30] * 32] * 32],
[[[10] * 32] * 32, [[40] * 32] * 32, [[70] * 32] * 32],
[[[1] * 32] * 32, [[1] * 32] * 32, [[1] * 32] * 32]])
dataloader_images = _get_test_dataloader(X_images, 2, False, False)
autoencoder_images = ConvolutionalAutoencoder(32, [512, 10])
encoded_images, decoded_images = encode_decode_batchwise(dataloader_images, autoencoder_images)
assert encoded_images.shape == (4, 10)
assert X_images.shape == decoded_images.shape


def test_int_to_one_hot():
Expand Down
Loading
Loading