#default_exp datasetsDatasets
Includes the functions:
construct_diamonds, make_tree, and make_swiss_roll
#hide
from nbdev.showdoc import *#export
import os
import pandas as pd, numpy as np
import phate
from sklearn import datasets
import seaborn as sns
sns.color_palette("bright")
import matplotlib as mpl#export
def construct_diamond(
points_per_petal:int=200,
petal_width:float=0.25,
direction:str='y'
):
'''
Arguments:
----------
points_per_petal (int). Defaults to `200`. Number of points per petal.
petal_width (float): Defaults to `0.25`. How narrow the diamonds are.
direction (str): Defaults to 'y'. Options `'y'` or `'x'`. Whether to make vertical
or horizontal diamonds.
Returns:
---------
points (numpy.ndarray): the 2d array of points.
'''
n_side = int(points_per_petal/2)
axis_1 = np.concatenate((
np.linspace(0, petal_width, int(n_side/2)),
np.linspace(petal_width, 0, int(n_side/2))
))
axis_2 = np.linspace(0, 1, n_side)
axes = (axis_1, axis_2) if direction == 'y' else (axis_2, axis_1)
points = np.vstack(axes).T
points = np.vstack((points, -1*points))
points = np.vstack((points, np.vstack((points[:, 0], -1*points[:, 1])).T))
return points
def make_diamonds(
points_per_petal:int=200,
petal_width:float=0.25,
colors:int=5,
scale_factor:float=30,
use_gaussian:bool=True
):
'''
Arguments:
----------
points_per_petal (int). Defaults to `200`. Number of points per petal.
petal_width (float): Defaults to `0.25`. How narrow the diamonds are.
colors (int): Defaults to `5`. The number of timesteps (colors) to produce.
scale_factor (float): Defaults to `30`. How much to scale the noise by
(larger values make samller noise).
use_gaussian (bool): Defaults to `True`. Whether to use random or gaussian noise.
Returns:
---------
df (pandas.DataFrame): DataFrame with columns `samples`, `x`, `y`, where `samples`
are the time index (corresponds to colors)
'''
upper = construct_diamond(points_per_petal, petal_width, 'y')
lower = construct_diamond(points_per_petal, petal_width, 'x')
data = np.vstack((upper, lower))
noise_fn = np.random.randn if use_gaussian else np.random.rand
noise = noise_fn(*data.shape) / scale_factor
data = data + noise
df = pd.DataFrame(data, columns=['d1', 'd2'])
c_values = np.linspace(colors, 1, colors)
c_thresholds = np.linspace(1, 0+1/(colors+1), colors)
df.insert(0, 'samples', colors)
df['samples'] = colors
for value, threshold in zip(c_values, c_thresholds):
index = ((np.abs(df.d1) <= threshold) & (np.abs(df.d2) <= threshold))
df.loc[index, 'samples'] = value
df.set_index('samples')
return dfdf = make_diamonds(200, 0.25, 5)
sns.scatterplot(data=df, x='d1', y='d2', hue='samples', palette='viridis')<AxesSubplot:xlabel='x', ylabel='y'>

#export
def make_swiss_roll(n_points=1500):
'''
Arguments:
----------
n_points (int): Default to `1500`.
Returns:
---------
df (pandas.DataFrame): DataFrame with columns `samples`, `d1`, `d2`, `d3`,
where `samples` are the time index (corresponds to colors)
'''
X, color = datasets.make_swiss_roll(n_samples=n_points)
df = pd.DataFrame(np.hstack((np.round(color).reshape(-1, 1), X)), columns='samples d1 d2 d3'.split())
df.samples -= np.min(df.samples)
return df#export
def make_tree():
'''
Arguments:
----------
n_points (int): Default to `1500`.
Returns:
---------
df (pandas.DataFrame): DataFrame with columns `samples`, `d1`, `d2`, `d3`,
`d4`, `d5` where `samples` are the time index (corresponds to colors)
'''
tree, branches = phate.tree.gen_dla(
n_dim = 200, n_branch = 10, branch_length = 300,
rand_multiplier = 2, seed=37, sigma = 5
)
phate_operator = phate.PHATE(n_components=5, n_jobs=-1)
tree_phate = phate_operator.fit_transform(tree)
df = pd.DataFrame(np.hstack((branches.reshape(-1, 1), tree_phate)), columns='samples d1 d2 d3 d4 d5'.split())
return df#export
from MIOFlow.constants import WORM_FILE
def make_worm_data():
data = np.load(WORM_FILE)
sample_labels = data['sample_labels']
embedding = data['embedding']
df = pd.concat([
pd.DataFrame(sample_labels, columns=['samples']),
pd.DataFrame(embedding, columns=list(map(lambda e: f'd{e}', '12345')))
], axis=1,
)
df.set_index('samples')
return df#export
from MIOFlow.constants import EB_BODIES_FILE,EB_BODIES_PSEUDO_4,EB_BODIES_PSEUDO_6,EB_BODIES_PSEUDO_25,EB_BODIES_PSEUDO_82
def make_eb_data(phate=False, phate_dims=5,n_sample='all', random_state=1):
data = np.load(EB_BODIES_FILE)
sample_labels = data['sample_labels']
embedding = data['pca']
df = pd.DataFrame(embedding, columns=[f'd{i}' for i in range(1, 101)])
df['samples'] = sample_labels
df.set_index('samples')
df['pt4'] = np.load(EB_BODIES_PSEUDO_4)
df['pt6'] = np.load(EB_BODIES_PSEUDO_6)
df['pt25'] = np.load(EB_BODIES_PSEUDO_25)
df['pt82'] = np.load(EB_BODIES_PSEUDO_82)
if n_sample != 'all' and not phate:
df = df.sample(n=n_sample,random_state=random_state)
if phate:
from phate import PHATE
phate_operator = PHATE(phate_dims, n_jobs=-1)
sub_sample = df.sample(n=n_sample,random_state=random_state)
Y_phate = phate_operator.fit_transform(sub_sample[[f'd{i}' for i in range(1, 11)]])
df = pd.concat([
pd.DataFrame(df.samples.values, columns=['samples']),
pd.DataFrame(Y_phate, columns=list(map(lambda e: f'd{e}', range(1, phate_dims+1))))
, df['pt4'], df['pt6'], df['pt25']], axis=1)
return df#export
from MIOFlow.constants import (DYNGEN_INFO_FILE, DYNGEN_EXPR_FILE)
from phate import PHATE
import warnings
def make_dyngen_data(
time_col='sim_time', phate_dims=10, round_labels=True,
use_gaussian:bool=False, add_noise=False, add_noise_after_phate=False,
scale_factor:float=1, scale_phate=100, n_bins=5, column='d1'
):
_valid = 'simulation_i step_ix sim_time'.split()
if time_col not in _valid:
time_col = _valid[0]
noise_fn = np.random.randn if use_gaussian else np.random.rand
exp = pd.read_csv(DYNGEN_EXPR_FILE, )
if add_noise and not add_noise_after_phate:
noise = noise_fn(*exp.shape) / scale_factor
exp += noise
ids = pd.read_csv(DYNGEN_INFO_FILE, skipfooter=1, engine='python').dropna(axis=1)
df = pd.concat([ids, exp], axis=1).set_index('cell_id')
df['samples'] = df[time_col]
df = df.drop(columns=_valid)
phate_operator = PHATE(phate_dims, n_jobs=-1)
Y_phate = phate_operator.fit_transform(df.drop(columns=['samples']))
Y_phate *= scale_phate
if add_noise and add_noise_after_phate:
noise = noise_fn(*Y_phate.shape) / scale_factor
Y_phate += noise
df = pd.concat([
pd.DataFrame(df.samples.values, columns=['samples']),
pd.DataFrame(Y_phate, columns=list(map(lambda e: f'd{e}', range(1, phate_dims+1))))
], axis=1)
if round_labels:
# instead of 0 - 1000 ----> 0 - 10
df.samples = np.round(df.samples, -2) / 100
df = relabel_data(df, min_bin=0, n_bins=n_bins, column=column)
if phate_dims in [2,5]:
locs = (df['d1'] <= -2.0)
df.loc[locs, 'samples'] = -1
df.drop(df[df['samples'] == -1].index, inplace = True)
elif phate_dims in [10,15,30,40,60]:
locs = (df['d1'] <= -1.9)
df.loc[locs, 'samples'] = -1
df.drop(df[df['samples'] == -1].index, inplace = True)
else:
warnings.warn('Not tested for this \'phate_dims\', using the same threshold as the one from \'[10,15,30,40,60]\' dims.')
locs = (df['d1'] <= -1.9)
df.loc[locs, 'samples'] = -1
df.drop(df[df['samples'] == -1].index, inplace = True)
return df
def relabel_data(df,min_bin=0, n_bins=10, column='d1', samples_key='samples'):
dff = df.copy()
x_min = np.min(dff[column])
x_max = np.max(dff[column])
parts = np.linspace(x_min, x_max, n_bins+1)
value = list(range(min_bin, n_bins+1, 1))
for i, x in list(zip(value, parts))[::-1]:
if i == 0:
continue
locs = (dff[column] <= x)
dff.loc[locs, samples_key] = i
return dff# #export
# def relabel_data(df, n_bins=10, column='d1', samples_key='samples'):
# dff = df.copy()
# x_min = np.min(dff[column])
# x_max = np.max(dff[column])
# parts = np.linspace(x_min, x_max, n_bins+1)
# value = list(range(0, n_bins+1, 1))
# for i, x in list(zip(value, parts))[::-1]:
# if i == 0:
# continue
# locs = (dff[column] <= x)
# dff.loc[locs, samples_key] = i
# return dff#export
import numpy as np, seaborn as sns, pandas as pd, matplotlib.pyplot as plt
def rings(
N:int, M:int = None,
data_scale:float = 1,
add_noise:bool = True,
noise_scale_theta:float = 0.7,
noise_scale_radius:float = 0.03,
buffer:float = 0.8,
**kwargs
) -> (np.ndarray, np.ndarray):
'''
Arguments:
N (int): Number of points to make.
M (int): Defaults to `None`. If `M='auto'` will automatically determine how many circles to make.
data_scale (float): Defaults to `1`. Multiplier to rescale the data.
add_noise (bool): Defaults to `True`. Whether or not to add noise to the data.
noise_scale_theta (float): Defaults to `0.7`. How much to scale the noise added to `theta`.
noise_scale_radius (float): Defaults to `0.3`. How much to scale the noise added to `radius`.
buffer (float): Defaults to `0.8`. How much to scale the `radius` to add some padding between circles.
**kwargs
Returns:
X (np.ndarray): The x, y coordinates for the points.
C (np.ndarray): The cluster number of each point.
'''
"""Generate petal data set."""
X = [] # points in respective petals
Y = [] # auxiliary array (points on outer circle)
C = []
assert N > 4, "Require more than four data points"
# Number of 'petals' to point into the data set. This is required to
# ensure that the full space is used.
if M is None:
M = int(np.floor(np.sqrt(N)))
thetas = np.linspace(0, 2 * np.pi, M, endpoint=False)
for theta in thetas:
Y.append(np.asarray([np.cos(theta), np.sin(theta)]))
# Radius of the smaller cycles is half of the chord distance between
# two 'consecutive' points on the circle.
radius = 0.5 * np.linalg.norm(Y[0] - Y[1])
for i, x in enumerate(Y):
for theta in thetas:
for j in range(N // M // len(thetas)):
r = radius if not add_noise else radius + np.random.randn() * noise_scale_radius
t = theta if not add_noise else theta + np.random.randn() * noise_scale_theta
r *= buffer
X.append(np.asarray([r * np.cos(t) - x[0], r * np.sin(t) - x[1]]))
# Indicates that this point belongs to the $i$th circle.
C.append(i)
X = np.asarray(X)
C = np.asarray(C)
X *= data_scale
return X, C
def make_rings(N:int, M:int = None,
data_scale:float = 1,
add_noise:bool = True,
noise_scale_theta:float = 0.7,
noise_scale_radius:float = 0.03,
buffer:float = 0.8,
**kwargs
) -> pd.DataFrame:
'''
Arguments:
N (int): Number of points to make.
M (int): Defaults to `None`. If `M='auto'` will automatically determine how many circles to make.
data_scale (float): Defaults to `1`. Multiplier to rescale the data.
add_noise (bool): Defaults to `True`. Whether or not to add noise to the data.
noise_scale_theta (float): Defaults to `0.7`. How much to scale the noise added to `theta`.
noise_scale_radius (float): Defaults to `0.3`. How much to scale the noise added to `radius`.
buffer (float): Defaults to `0.8`. How much to scale the `radius` to add some padding between circles.
**kwargs
Returns:
X (np.ndarray): The x, y coordinates for the points.
C (np.ndarray): The cluster number of each point.
'''
x, c = rings(N, M, data_scale, add_noise, noise_scale_theta, noise_scale_radius, buffer)
df = pd.DataFrame(x, columns=[f'd{i+1}' for i in range(x.shape[1])])
df['samples'] = c
df.set_index('samples')
return dfdf = make_rings(
2000, 6,
data_scale = 5,
add_noise = True, noise_scale_theta = 0.7,
noise_scale_radius = 0.03,
buffer = 0.8,
)
df.head()
plt.figure(figsize=(12, 8))
sns.scatterplot(
data=df, x='d1', y='d2',
hue='samples', palette='viridis',
size='samples', sizes=(100, 100),
)<AxesSubplot:xlabel='d1', ylabel='d2'>

#export
def make_jacks(
n_axes = 3,
points = 1000,
label_by = 'axis',
n_classes = 3,
use_neg = True,
data_scale = 3,
add_noise = True,
noise_scale = 0.03,
):
_valid_label_bys = 'axis coord'.split()
if label_by not in _valid_label_bys:
label_by = _valid_label_bys[0]
results = []
classes = []
axes = np.eye(n_axes)
for i, axis in enumerate(axes):
segment = np.linspace(0, 1, points // n_axes).reshape(-1, 1)
if add_noise:
coordinates = axis * (segment + np.random.randn(segment.size, 1) * noise_scale)
else:
coordinates = axis * segment
results.extend(coordinates.tolist())
if label_by == 'axis':
labels = [i for j in range(len(segment))]
classes.extend(labels)
elif label_by == 'coord':
labels = [
k for k in range(n_classes)
for j in range(points // n_axes // n_classes)
]
for j in range(len(segment) - len(labels)):
labels.append(n_classes - 1)
classes.extend(labels)
if use_neg:
for i, axis in enumerate(axes):
segment = np.linspace(0, 1, points // n_axes).reshape(-1, 1) * -1
if add_noise:
coordinates = axis * (segment + np.random.randn(segment.size, 1) * noise_scale)
else:
coordinates = axis * segment
results.extend(coordinates.tolist())
if label_by == 'axis':
labels = [n_axes + i for j in range(len(segment))]
classes.extend(labels)
elif label_by == 'coord':
labels = [
k for k in range(n_classes)
for j in range(points // n_axes // n_classes)
]
for j in range(len(segment) - len(labels)):
labels.append(n_classes - 1)
classes.extend(labels)
results = np.array(results) + np.random.randn(len(results), n_axes) * noise_scale
results *= data_scale
df = pd.DataFrame(results, columns=[f'd{i+1}' for i in range(n_axes)])
df['samples'] = classes
df.set_index('samples')
return dfdf = make_jacks(
n_axes = 3,
points = 1000,
label_by = 'axis',
n_classes = 3,
use_neg = True,
data_scale = 3,
add_noise = True,
noise_scale = 0.03,
)
fig = plt.figure(figsize=(12, 8))
ax = plt.axes(projection='3d')
ax.scatter(df.d1, df.d2, df.d3, c=df.samples)<mpl_toolkits.mplot3d.art3d.Path3DCollection>
