Table of Contents

-- mode: Org; fill-column: 110; coding: utf-8; -- #+TITLE Python for data science

1. NumPy

1.1. frequent operations on shape

1.1.1. reshape((-1, 1)

import numpy as np
x = np.array([1,2,3,4,5])
print(np.concatenate((x, x)).reshape((-1, 1)))
[[1]
 [2]
 [3]
 [4]
 [5]
 [1]
 [2]
 [3]
 [4]
 [5]]

1.2. theory

[ˈnʌmpaɪ] large, multi-dimensional arrays and matrices. BSD-new license. multi-dimensional container of generic data

  • a powerful N-dimensional array object
  • sophisticated (broadcasting) functions
  • useful linear algebra, Fourier transform, and random number capabilities

ndarray - n-dimensional array

  • homogeneously typed: all elements of a single array must be of the same type
  • np.pad(…) routine to extend arrays actually creates new arrays of the desired shape and padding values, copies the given array into the new one and returns it

Type hint

def f(x: np.ndarray) -> np.ndarray

… = : - Ellipse ones[:,5] - пятый слобец

1.3. shape size dtype etc:

  • ndarray.shape
  • ndarray.size - произведение чисел в shape
  • ndarray.dtype - bool_, character, int8, int16, int32, int64, float8, float16, float32, float64, complex64, object_
  • ndarray.itemsize - размер элемента в байтах
  • ndarray.data - обратно в python - не рекомендуется пользоваться

1.4. basic

import numpy as np
a = np.array([1, 2, 3])
a[[1,2]] # array([2, 3])

>>> np.arange(4).reshape((2,2))
array([[0, 1],
       [2, 3]])
>>> a = np.arange(4).reshape((2,2))
>>> a
array([[0, 1],
       [2, 3]])
>>> a.sum(axis=0)
array([2, 4])
>>> a.sum(1)
array([1, 5])
>>> a.sum(-1)
array([1, 5])

x = np.array([[1,2],[3,4]])
x[:,0] # array([1, 3])

np.zeros((3, 5), dtype=float) # dtype - по умолчанию float
np.ones((2, 2, 2)) # all 1
np.eye(5) # единицы на диагонали
np.empty((3, 3)) # случайное какая была память так и заполнилась
np.arange(10, 30, 5) # range
np.linspace(0, 2, 9) # от 0 до 2 - создать 9 штук
np.logspace(start, stop, num=50, endpoint=True, base=10.0) # base**start - base ** stop с ускорением

np.amax(nparray) # max element
np.amin(nparray) # min element
np.nanmin(data[:, 1]) # max element at column 1

self.img[:] = 255 # replace every element with single value
# filter None elements:
self.contours = np.array(list(filter(lambda x:x is not None, self.contours)))

#
a = np.linspace(-np.pi, np.pi, 100)
b = np.sin(a)
c = np.cos(a)

# Linear algebra
from numpy.random import rand
from numpy.linalg import solve, inv
a = np.array([[1, 2, 3], [3, 4, 6.7], [5, 9.0, 5]])
a.transpose()
array([[ 1. ,  3. ,  5. ],
       [ 2. ,  4. ,  9. ],
       [ 3. ,  6.7,  5. ]])
inv(a)
array([[-2.27683616,  0.96045198,  0.07909605],
       [ 1.04519774, -0.56497175,  0.1299435 ],
       [ 0.39548023,  0.05649718, -0.11299435]])
b =  np.array([3, 2, 1])
solve(a, b)  # solve the equation ax = b
array([-4.83050847,  2.13559322,  1.18644068])

c = rand(3, 3) * 20  # create a 3x3 random matrix of values within [0,1] scaled by 20
array([[  3.98732789,   2.47702609,   4.71167924],
       [  9.24410671,   5.5240412 ,  10.6468792 ],
       [ 10.38136661,   8.44968437,  15.17639591]])
np.dot(a, c)  # matrix multiplication
a @ c # Starting with Python 3.5 and NumPy 1.10

# per column operations
data[:, 1] = (data[:, 1] - data_min)
data[:,1] +=1

# Add dimension
x = np.expand_dims(x, axis=0)
x = x[np.newaxis, :]

# elemets at positons
a = a[np.array([1, 2, 10, 3])]

1.5. masking and comparision

  • x>1 - Boolean array indexing [True, False]
  • x[x>1] - select elements with True
  • (a[1,:]!=2) & (a[1,:]!=2) - and
  • cv2.bitwise_not(gray)
a = array([1, 2, 3, 4, 4])
# get elements where >2
a[np.where( a > 2)]
>> array([1, 2, 3, 4, 4])
a[a > 2]
>> array([1, 2, 3, 4, 4])

1.6. LOOPING

substarct every [9,3,6] from [1,2,3,4,5,6] and find min of abs:

import numpy as np
c = [1,2,3,4,5,6]
s = [9,3,6]
su = np.repeat([c],len(s),axis=0).T - s
m = np.min(np.abs(su), axis=0)
print(m)

1.7. replace

my_array[my_array == 8] = 20
my_array[(my_array > 8) | (my_array < 6)] = 20
result= np.where(new_array==np.inf, 0, new_array)
# inf
result=np.where(np.isinf(a), 999999, a)
result=np.where(np.isnan(a), 0, a)
np.place(new_values, new_values<0, [0])

1.8. round округление

a = np.array([1.1, 1.5, 1.9], float)
>>> np.floor(a)
array([ 1.,  1.,  1.])
>>> np.ceil(a)
array([ 2.,  2.,  2.])
>>> np.rint(a)
array([ 1.,  2.,  2.])

1.9. keras.utils.to_categorical

1.9.1. basic

y_classes = keras.utils.to_categorical(range(len(paths))) # classes array in one-hot
train_y.append(y_classes[i]) #to set
# back
out = model.predict
i = np.argmax(out, axis=-1)[0] #id
paths[i] # original

1.9.2. add sum category

>>> c
array([[1., 0.],
       [0., 1.]], dtype=float32)
np.append(c, [c[0]+c[1]], axis=0)
# result:
array([[1., 0.],
       [0., 1.],
       [1., 1.]], dtype=float32)

1.10. save and saves

np.save('123', data) # 123.npy
data = np.load('../123.npy', mmap_mode=None)

1.11. ignore items on diagonal

not_diag = np.where(~np.eye(dists.shape[0],dtype=bool))
cl_distance = np.mean(dists[not_diag]) # mean mey be replace with something close to median

1.12. get items below diagonal (triangleform from squareform)

get upper triangleform:

C3 = np.triu(C2)

ge lower triangleform:

C3 = np.tril(C2)

get elements:

arr2 = np.where(np.tri(arr.shape[0],arr.shape[1], k = -1) == 1)

2. pandas

2.1. read csv

pd.read_csv(p, index_col=0, sep='\t')
  • sep='\t' иногда встречается разделение столбцов по \t. обычно запятой

2.2. sort

df.sort_values(by=df['Клиент'], axis=1) # 0 we gave columns, 1 we gave row indexes and sort columns

2.3. replace value

  1. new column must be created
df.loc[df.Followers == 'N/A', 'Followers'] = np.nan
  1. can use regex
df['Followers'].replace(to_replace='N/A', value=np.nan)
  1. can use any function

3.1) on series

df['holiday'] = df['holiday'].apply(lambda x: 1 if x != 0 else 0)

3.2) raw=True gives big speed up

df.apply(lambda row: sum_square(row[0], row[1]), raw=True, axis=1 )
  1. convert DataFrame to numpy

2.4. analysis

import pandas as pd
AH = pd.read_csv('a.csv', header=0, index_col = False)
print(df.head()) # first 5 lines
print(df.shape)
print(df.dtypes.to_string()) # типы всех! столбцов
print(df.columns) # названия всех! столбцов
print(df.iloc[:]) # названия всех! столбцов
print(df['birth_date']) # one column values
print(df.isnull().values.any()) # any NaN?
print(df.describe(include='all')) # pre column: unique, mean, std, min, квантиль
df.iloc[1, :].value_counts() #100    1  400    1  300    1  200    1
df.iloc[1, :].value_counts(normalize=True) #100    0.25  400    0.25  300    0.25 200    0.25

# Categories and Uniques
Categorial or not. Unique Values
categorial_columns = [c for c in data.columns if data[c].dtype.name == 'object']
categorial_columns = df.select_dtypes(include=["object"]).columns # or
numerical_columns = [c for c in data.columns if data[c].dtype.name != 'object']
numerical_columns = df.select_dtypes(exclude=["object"]).columns # or
print(data[categorial_columns].describe())
# unique
: for c in categorial_columns:
:    print(c, data[c].unique())

# histogram

import matplotlib
matplotlib.use('TkAgg')
from matplotlib import pyplot as plt

AH['SalePrice'].hist(bins = 60, normed=1) # calls matplotlib.pyplot.hist
plt.show()

# plot столбец

sales.iloc[:,1].plot()

2.5. Series

One-dimensional ndarray with axis labels

combine along index

  • pd.concat([s1,s2], axis=1)

for dataframes merge:

  • df1.reset_index()
  • df2.reset_index()
  • df1.merge(df2)
mydict = [{'a': 1, 'b': 2, 'c': 3, 'd': 4},
          {'a': 100, 'b': 200, 'c': 300, 'd': 400},
          {'a': 1000, 'b': 2000, 'c': 3000, 'd': 4000 }]
df = pd.DataFrame(mydict)

df.iloc[0] # {'a': 1, 'b': 2, 'c': 3, 'd': 4}
type(df.iloc[0]) # <class 'pandas.core.series.Series'>
df.iloc[[0,1,2]] == df == df.iloc[:3]
df.iloc[0, 1] # 2
df.values # convert to numpy

2.6. DataFrame

Two-dimensional, size-mutable data. Container for Series objects

# 1) way
d = {'col1': [1, 2], 'col2': [3, 4]}
s1 = pd.DataFrame(data=d)
# 2) way
staff = [(col, melb_df[col].nunique(),melb_df[col].dtypes)]
unique_counts = pd.DataFrame(
    staff,
    columns=['Column_Name', 'Num_Unique', 'Type']
).sort_values(by='Num_Unique',  ignore_index=True)

2.7. index and levels

  • default - created autoincrement int
  • df.set_index('c')
  • df.reset_index(drop=True, inplace=True) - index to column, create new index, default: drop=False
  • df.index = Series - ad hoc index
  • df.index.name - index column name

index and columns may have multiple levels

  • multilevel index reated by groupby
  • df.loc[index, (column|:)] - get values at index
  • df.iloc[integer] - get values at position

2.8. WHERE AND FILTERS

https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#boolean-indexing methods

- loc - df.loc[(df['Salary_in_1000']>=100) & (df['Age']< 60) & (df['FT_Team'].str.startswith('S')),['Name','FT_Team']]
- df.index[(df['Salary_in_1000']>=100) & (df['Age']< 60)]
- numpy where
  - idx = np.where((df['Salary_in_1000']>=100) & (df['Age']< 60) & (df['FT_Team'].str.startswith('S')))
  - df.loc[idx]
- Query - df.query('Salary_in_1000 >= 100 & Age < 60 & FT_Team.str.startswith("S").values')
- Boolean Indexing - df[(df['Salary_in_1000']>=100) & (df['Age']<60) & df['FT_Team'].str.startswith('S')][['Name','Age','Salary_in_1000']]
- eval - df[df.eval("Salary_in_1000>=100 & (Age <60) & FT_Team.str.startswith('S').values")]

bool - | or, & and, ~ not

# DATΑFRAME --------
df.shop_id.nunique()

df[df>100] # nan, nan, 101
df[df.shop_id > 20] # filter works!

# making boolean series for a team name
filter1 = data["Team"]=="Atlanta Hawks"
# making boolean series for age
filter2 = data["Age"]>24
# filtering data on basis of both filters
data.where(filter1 & filter2, inplace = True)

# SERIES -------------
s = pd.Series(range(5)) # 0,1,2,3,4
s.where(s>1,-1)  # -1, -1, 2, 3, 4
s.mask(s>1, -1)  # 0, 1, -1, -1, -1

s[s>2] # 3, 4

2.8.1. filter by date

 df = df.dropna(subset=['Дата_заключения_контракта_d'])
 d0101 = pd.to_datetime('20190101', format='%Y%m%d', errors='ignore')
 d0731 = pd.to_datetime('20190731', format='%Y%m%d', errors='ignore')
 df = df[d0101 >= df['Дата_заключения_контракта_d'] >= d0731]

2.9. COUNT

2.9.1. get unique rows with count

a = pd.DataFrame(a.groupby(['Коды отказа', 'Описание кодов отказа']).size().reset_index(name="count"))
a = pd.DataFrame(a)
c_row = a.pop('count')
a.insert(0, 'count', c_row)
a.sort_values(by=['count'], ascending=False).to_csv('kod_otkaza.csv')

2.9.2. count example

# Person   Age  Single
# 0    John  24.0   False
# 1    Myla   NaN    True
# 2   Lewis  21.0    True
# 3    John  33.0    True
# 4    Myla  26.0   False

# create multiindex and count
df.set_index(["Person", "Single"]).count(level="Person")
# John      2
# Lewis     1
# Myla      1

df.set_index(["Person", "Single"]).count(level="Single")
# False     2
# True      2

2.9.3. most frequent

pd.Series([2,3,4,5,6].value_counts().idxmax()

2.10. RESHAPINGS guide https://pandas.pydata.org/docs/user_guide/reshaping.html

2.10.1. Resample for timeseries

  • 'M' - month boundary
  • 'A' - annual
loan_rev_data=data['Loan Amount']
loan_rev_data['date'] = pd.DatetimeIndex(data['Created Date'])
loan_rev_data = loan_rev_data.set_index('date')
monthly_loan_rev_data= loan_rev_data.resample('M').sum()
            Loan Amount
date
2014-10-31  13039283.00
2014-11-30  16097733.00
2014-12-31  29077334.00

2.10.2. pivot - rows to columns without aggregation

Uses unique values from specified index / columns to form axes of the resulting DataFrame

params: index, columns, values

import pandas as pd
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two','two'],
                   'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                   'baz': [1, 2, 3, 4, 5, 6],
                   'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
print(df)
print()
print(df.pivot(index='foo', columns='bar', values='baz'))
   foo bar  baz zoo
0  one   A    1   x
1  one   B    2   y
2  one   C    3   z
3  two   A    4   q
4  two   B    5   w
5  two   C    6   t

bar  A  B  C
foo
one  1  2  3
two  4  5  6

Possible misstakes example:

import pandas as pd
df = pd.DataFrame({"foo": ['one', 'one', 'two', 'two'],
                   "bar": ['A', 'A2', 'B', 'C'], # new columns should not have duplicates in one index
                   "baz": [1, 2, 3, 4]})
print(df.pivot(index='foo', columns='bar', values='baz'))
bar    A   A2    B    C
foo
one  1.0  2.0  NaN  NaN
two  NaN  NaN  3.0  4.0

2.10.3. stack (levels)

import pandas as pd
df_single_level_cols = pd.DataFrame([[0, 1], [2, 3]],
                                    index=['cat', 'dog'],
                                    columns=['weight', 'height'])
print(df_single_level_cols)
print()
print(df_single_level_cols.stack())
     weight  height
cat       0       1
dog       2       3

cat  weight    0
     height    1
dog  weight    2
     height    3
dtype: int64

2.10.4. melt - columns to rows

  1. ex1
    import pandas as pd
    df = pd.DataFrame(
        {
            "first": ["John", "Mary"],
            "last": ["Doe", "Bo"],
            "height": [5.5, 6.0],
            "weight": [130, 150],
        })
    print(df)
    print()
    print(df.melt(id_vars=["first", "last"]))
    
      first last  height  weight
    0  John  Doe     5.5     130
    1  Mary   Bo     6.0     150
    
      first last variable  value
    0  John  Doe   height    5.5
    1  Mary   Bo   height    6.0
    2  John  Doe   weight  130.0
    3  Mary   Bo   weight  150.0
    
  2. ex2
    import pandas as pd
    df = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                       'B': {0: 1, 1: 3, 2: 5},
                       'C': {0: 2, 1: 4, 2: 6}})
    print(df)
    print()
    print(pd.melt(df, id_vars=['A'], value_vars=['B']))
    
       A  B  C
    0  a  1  2
    1  b  3  4
    2  c  5  6
    
       A variable  value
    0  a        B      1
    1  b        B      3
    2  c        B      5
    

2.10.5. pivot_table - allow aggs

  1. ex1
    import pandas as pd
    import numpy as np
    import datetime
    df = pd.DataFrame(
        {
            "A": ["one", "one", "two", "three"] * 6,
            "B": ["A", "B", "C"] * 8,
            "C": ["foo", "foo", "foo", "bar", "bar", "bar"] * 4,
            "D": np.random.randn(24),
            "E": np.random.randn(24),
            "F": [datetime.datetime(2013, i, 1) for i in range(1, 13)]
            + [datetime.datetime(2013, i, 15) for i in range(1, 13)],
        })
    print(df)
    print()
    print(pd.pivot_table(df, values="D", index=["A", "B"], columns=["C"]))
    print()
    print(pd.pivot_table(df, values="D", index=["B"], columns=["A", "C"], aggfunc=np.sum))
    
            A  B    C         D         E          F
    0     one  A  foo  0.834789 -0.268575 2013-01-01
    1     one  B  foo -0.332062 -0.324379 2013-02-01
    2     two  C  foo -2.095669 -2.186134 2013-03-01
    3   three  A  bar -0.793498  0.126653 2013-04-01
    4     one  B  bar  0.117796 -0.845898 2013-05-01
    5     one  C  bar  1.016105 -0.369420 2013-06-01
    6     two  A  foo  1.151064 -0.698485 2013-07-01
    7   three  B  foo -0.487159  0.123010 2013-08-01
    8     one  C  foo -1.456931  1.230448 2013-09-01
    9     one  A  bar -0.591074 -0.851506 2013-10-01
    10    two  B  bar  1.332696  0.161591 2013-11-01
    11  three  C  bar  0.033348 -0.187387 2013-12-01
    12    one  A  foo -1.159041  0.321096 2013-01-15
    13    one  B  foo  0.353786  0.724629 2013-02-15
    14    two  C  foo -1.765572 -0.708540 2013-03-15
    15  three  A  bar  0.805330 -0.652539 2013-04-15
    16    one  B  bar -0.124616  0.014006 2013-05-15
    17    one  C  bar -0.052215 -0.168125 2013-06-15
    18    two  A  foo  0.921741  0.280954 2013-07-15
    19  three  B  foo -0.584663  0.727251 2013-08-15
    20    one  C  foo -1.740931  1.516952 2013-09-15
    21    one  A  bar -0.189743 -0.515618 2013-10-15
    22    two  B  bar -0.099166  0.002090 2013-11-15
    23  three  C  bar -0.487092 -0.996470 2013-12-15
    
    C             bar       foo
    A     B
    one   A -0.390408 -0.162126
          B -0.003410  0.010862
          C  0.481945 -1.598931
    three A  0.005916       NaN
          B       NaN -0.535911
          C -0.226872       NaN
    two   A       NaN  1.036402
          B  0.616765       NaN
          C       NaN -1.930620
    
    A       one               three                two
    C       bar       foo       bar       foo      bar       foo
    B
    A -0.780817 -0.324252  0.011831       NaN      NaN  2.072805
    B -0.006820  0.021724       NaN -1.071822  1.23353       NaN
    C  0.963890 -3.197862 -0.453743       NaN      NaN -3.861240
    
  2. ex2
    import pandas as pd
    import numpy as np
    print(pd.pivot_table(df[["A", "B", "C", "D", "E"]], index=["A", "B"], columns=["C"]))
    print()
    print(pd.pivot_table(df, values="D", index=pd.Grouper(freq="M", key="F"), columns="C"))
    print()
    table = pd.pivot_table(df, index=["A", "B"], columns=["C"], values=["D", "E"])
    print(table.to_string(na_rep=""))
    print()
    table = df.pivot_table(
        index=["A", "B"],
        columns="C",
        values=["D", "E"],
        margins=True,
        aggfunc=np.std)
    print(table)
    print()
    print(table.stack())
    
                    D                   E
    C             bar       foo       bar       foo
    A     B
    one   A -0.390408 -0.162126 -0.683562  0.026260
          B -0.003410  0.010862 -0.415946  0.200125
          C  0.481945 -1.598931 -0.268773  1.373700
    three A  0.005916       NaN -0.262943       NaN
          B       NaN -0.535911       NaN  0.425131
          C -0.226872       NaN -0.591928       NaN
    two   A       NaN  1.036402       NaN -0.208765
          B  0.616765       NaN  0.081840       NaN
          C       NaN -1.930620       NaN -1.447337
    
    C                bar       foo
    F
    2013-01-31       NaN -0.162126
    2013-02-28       NaN  0.010862
    2013-03-31       NaN -1.930620
    2013-04-30  0.005916       NaN
    2013-05-31 -0.003410       NaN
    2013-06-30  0.481945       NaN
    2013-07-31       NaN  1.036402
    2013-08-31       NaN -0.535911
    2013-09-30       NaN -1.598931
    2013-10-31 -0.390408       NaN
    2013-11-30  0.616765       NaN
    2013-12-31 -0.226872       NaN
    
                    D                   E
    C             bar       foo       bar       foo
    A     B
    one   A -0.390408 -0.162126 -0.683562  0.026260
          B -0.003410  0.010862 -0.415946  0.200125
          C  0.481945 -1.598931 -0.268773  1.373700
    three A  0.005916           -0.262943
          B           -0.535911            0.425131
          C -0.226872           -0.591928
    two   A            1.036402           -0.208765
          B  0.616765            0.081840
          C           -1.930620           -1.447337
    
                    D                             E
    C             bar       foo       All       bar       foo       All
    A     B
    one   A  0.283784  1.409851  0.840699  0.237509  0.416961  0.494677
          B  0.171411  0.484967  0.297085  0.608044  0.741761  0.658146
          C  0.755417  0.200819  1.283359  0.142337  0.202589  0.958996
    three A  1.130542       NaN  1.130542  0.550971       NaN  0.550971
          B       NaN  0.068946  0.068946       NaN  0.427263  0.427263
          C  0.368006       NaN  0.368006  0.572108       NaN  0.572108
    two   A       NaN  0.162156  0.162156       NaN  0.692568  0.692568
          B  1.012479       NaN  1.012479  0.112784       NaN  0.112784
          C       NaN  0.233414  0.233414       NaN  1.044817  1.044817
    All      0.651877  1.140991  0.940582  0.408882  0.998514  0.759845
    
                        D         E
    A     B C
    one   A All  0.840699  0.494677
            bar  0.283784  0.237509
            foo  1.409851  0.416961
          B All  0.297085  0.658146
            bar  0.171411  0.608044
            foo  0.484967  0.741761
          C All  1.283359  0.958996
            bar  0.755417  0.142337
            foo  0.200819  0.202589
    three A All  1.130542  0.550971
            bar  1.130542  0.550971
          B All  0.068946  0.427263
            foo  0.068946  0.427263
          C All  0.368006  0.572108
            bar  0.368006  0.572108
    two   A All  0.162156  0.692568
            foo  0.162156  0.692568
          B All  1.012479  0.112784
            bar  1.012479  0.112784
          C All  0.233414  1.044817
            foo  0.233414  1.044817
    All     All  0.940582  0.759845
            bar  0.651877  0.408882
            foo  1.140991  0.998514
    

2.10.6. pivot tables(old)

melb_df.groupby(['Rooms', 'Type'])['Price'].mean() # иерархические индексы
melb_df.groupby(['Rooms', 'Type'])['Price'].mean().unstack() # раскладывает таблицу в столбцы
melb_df.pivot_table(
    values='Price',
    index='Rooms',
    columns='Type',
    fill_value=0
).round() # аналогично второму

2.10.7. crosstab - frequencies

frequency table of the factors unless an array of values and an aggregation function are passed.

import pandas as pd
import numpy as np
foo, bar, dull, shiny, one, two = "foo", "bar", "dull", "shiny", "one", "two"
a = np.array([foo, foo, bar, bar, foo, foo], dtype=object)
b = np.array([one, one, two, one, two, one], dtype=object)
c = np.array([dull, dull, shiny, dull, dull, shiny], dtype=object)
print("frequencies:")
print(pd.crosstab(a, b))
print()
print(pd.crosstab(a, [b, c], rownames=["a"], colnames=["b", "c"]))
frequencies:
col_0  one  two
row_0
bar      1    1
foo      3    1

b    one        two
c   dull shiny dull shiny
a
bar    1     0    0     1
foo    2     1    1     0

2.10.8. cut - transform continuous variables to discrete or categorical variables

import pandas as pd
import numpy as np
ages = np.array([10, 15, 13, 12, 23, 25, 28, 59, 60])
print(pd.cut(ages, bins=3))
print()
print(pd.cut(ages, bins=[0, 18, 35, 70]))
[(9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (26.667, 43.333], (43.333, 60.0], (43.333, 60.0]]
Categories (3, interval[float64, right]): [(9.95, 26.667] < (26.667, 43.333] < (43.333, 60.0]]

[(0, 18], (0, 18], (0, 18], (0, 18], (18, 35], (18, 35], (18, 35], (35, 70], (35, 70]]
Categories (3, interval[int64, right]): [(0, 18] < (18, 35] < (35, 70]]

2.10.9. dummies

  • pd.get_dummies(df, prefix="new_prefix")
  • pd.from_dummies(df, sep="_")

2.10.10. factorize - categories to numbers

import pandas as pd
import numpy as np
x = pd.Series(["A", "A", np.nan, "B", 3.14, np.inf])
labels, uniques = pd.factorize(x)
print(labels)
print(uniques)
[ 0  0 -1  1  2  3]
Index(['A', 'B', 3.14, inf], dtype='object')

2.10.11. explode

import pandas as pd
import numpy as np
keys = ["panda1", "panda2", "panda3"]
values = [["eats", "shoots"], ["shoots", "leaves"], ["eats", "leaves"]]
df = pd.DataFrame({"keys": keys, "values": values})
print(df)
print()
print(df["values"].explode())
print()
print(df.explode("values"))
     keys            values
0  panda1    [eats, shoots]
1  panda2  [shoots, leaves]
2  panda3    [eats, leaves]

0      eats
0    shoots
1    shoots
1    leaves
2      eats
2    leaves
Name: values, dtype: object

     keys  values
0  panda1    eats
0  panda1  shoots
1  panda2  shoots
1  panda2  leaves
2  panda3    eats
2  panda3  leaves

2.10.12. assign and explode - split values to rows

import pandas as pd
import numpy as np
df = pd.DataFrame([{"var1": "a,b,c,d", "var2": 1}, {"var1": "d,e,f", "var2": 2}])
print(df)
print()
print(df.assign(var1=df.var1.str.split(",")).explode("var1"))
      var1  var2
0  a,b,c,d     1
1    d,e,f     2

  var1  var2
0    a     1
0    b     1
0    c     1
0    d     1
1    d     2
1    e     2
1    f     2

2.11. Merge, join, and concatenate

https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html

Одну таблицу разделенную на две части:

  • верх и низ: pd.concat([s1, s2], ignore_index=True)
  • лево и право ?
  • concatenate - по умолчанию добавляются строки, default: axis=0, join='outer', ignore_index = False
    • pd.concat([df1, df4], axis=1, sort=False) - подбираются столбцы с одинаковым значением, добавляются NaN-s
    • join='outer' - NaN-s не добавляются

SQL style

  1. merge - ignore index, uses specified column
    • pd.merge(playdata, genetic_train, on="SK_ID_CURR",how="left" ) - если есть дупликаты справа, то они все войдут даже справа
    • "on" must be found in both DataFrames
    • indicator=True - adds _merge field with ['left_only', 'right_only', 'both']
  2. join - uses index column
    • first you should set index to joined columns
    • table1.join(table2, lsuffix='_table1', rsuffix='_table2',how="left")

new column:

df['asd'] = list

2.11.1. concat series

>>> df
   0
0  1
2  3
>>> df2
   0
0  1
1  2
>>> pd.concat([df,df2], axis=1)
     0    0
0  1.0  1.0
2  3.0  NaN
1  NaN  2.0

import pandas as pd
s1 = pd.Series(['a', 'b'])
s2 = pd.Series(['c', 'd'])
print(pd.concat([s1, s2], ignore_index=True))
0    a
1    b
2    c
3    d
dtype: object

2.11.2. concat datafremes vertically

import pandas as pd
df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],

                    'value': [1, 2, 3, 5]})

df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],

                    'value': [5, 6, 7, 8]})
print(df2)
print(pd.concat([df1, df2], ignore_index=True))
  rkey  value
0  foo      5
1  bar      6
2  baz      7
3  foo      8
  lkey  value rkey
0  foo      1  NaN
1  bar      2  NaN
2  baz      3  NaN
3  foo      5  NaN
4  NaN      5  foo
5  NaN      6  bar
6  NaN      7  baz
7  NaN      8  foo

2.11.3. merge

import pandas as pd
left = pd.DataFrame(
    {
        "key": ["K0", "K1", "K2", "K3"],
        "A": ["A0", "A1", "A2", "A3"],
        "B": ["B0", "B1", "B2", "B3"],
    }
)

right = pd.DataFrame(
    {
        "key": ["K0", "K1", "K2", "K3", "K0"], # K0 duplicate
        "C": ["C0", "C1", "C2", "C3", "C3"],
        "D": ["D0", "D1", "D2", "D3", "D3"],
    }
)

result = pd.merge(left, right, on="key", how='left')
print(result)
  key   A   B   C   D
0  K0  A0  B0  C0  D0
1  K0  A0  B0  C3  D3
2  K1  A1  B1  C1  D1
3  K2  A2  B2  C2  D2
4  K3  A3  B3  C3  D3

2.11.4. add by date

def add_holiday_features(df, dfh):
    df['date'] = df['pickup_datetime'].dt.date
    df['date'] = df['date'].astype(str)
    df = df.merge(dfh, 'left', on='date')
    df['holiday'].fillna(0, inplace=True)
    df['holiday'] = df['holiday'].apply(lambda x: 1 if x != 0 else 0)
    df.drop(columns=['date'], inplace=True)
    return df

2.12. DISTICT groupby

print(df.groupby('shop_id').item_id.value_counts())
print(df.groupby('shop_id').item_id.nunique())

dfg = df[['shop_id', 'item_id'] ].groupby('shop_id')
print(dfg.agg(['mean', 'count', 'min']))

2.12.1. row number by group - добавить сложную номерацию по группам

df['Номер_контракта'] = df.groupby(['Клиент'])['Дата_заключения_контракта'].cumcount()+1

2.13. two dataframes

2.13.1. sets comparision

def count_fkey(key1, key2):
    un1 = np.unique(key1)
    un2 = np.unique(key2)
    cm = np.in1d(un1, un2, assume_unique=True)
    if 'name' in dir(key1):
        print(f"Unique [{key1.name}]: { un1.size}")
        print(f"Unique [{key2.name}]: { un2.size}")
    else:
        print(f"key1: { un1.size}")
        print(f"key2: { un2.size}")
    c = np.unique(cm, return_counts=True)
    print(pd.DataFrame({'values':c[0], 'count':c[1]}))

2.14. Map, Apply, Applymap

2.14.1. Comparing map, applymap and apply: Context Matters

First major difference: DEFINITION

  • map is defined on Series ONLY
  • applymap is defined on DataFrames ONLY
  • apply is defined on BOTH

Second major difference: INPUT ARGUMENT

  • map accepts dicts, Series, or callable
  • applymap and apply accept callables only

Third major difference: BEHAVIOR

  • map is elementwise for Series
  • applymap is elementwise for DataFrames
  • apply also works elementwise but is suited to more complex operations and aggregation. The behaviour and return value depends on the function.

Fourth major difference (the most important one): USE CASE

map is meant for mapping values from one domain to another, so is optimised for performance (e.g., df['A'].map({1:'a', 2:'b', 3:'c'}))
applymap is good for elementwise transformations across multiple rows/columns (e.g., df[['A', 'B', 'C']].applymap(str.strip))
apply is for applying any function that cannot be vectorised (e.g., df['sentences'].apply(nltk.sent_tokenize))

Footnotes

  • map when passed a dictionary/Series will map elements based on the keys in that dictionary/Series. Missing values will be recorded as NaN in the output.
  • applymap in more recent versions has been optimised for some operations. You will find applymap slightly faster than apply in some cases. My suggestion is to test them both and use whatever works better. (deprecated)
  • map is optimised for elementwise mappings and transformation. Operations that involve dictionaries or Series will enable pandas to use faster code paths for better performance.
  • Series.apply returns a scalar for aggregating operations, Series otherwise. Similarly for DataFrame.apply. Note that apply also has fastpaths when called with certain NumPy functions such as mean, sum, etc.

2.14.2. apply to column

df['A'] = df['A'].apply(lambda x: str.strip(x) if pd.notna(x) else x)

2.14.3. return multiple rows

return pd.Series([1,2,3]) ; df['a'].apply(f).to_numpy()[:,1] - time 13 sec
return [1,2,3] ; list(zip(*df['a'].apply(f).to_list()) - time 28.6 sec

2.14.4. example

s.map('I am a {}'.format)
s.map({' <=50K.': 0, ' >50K.': 1})
s.map({'fox': 'cub', 'cow': 'calf'})
df['result'] = df['result'].map({b'OK': 1, b'STOP': 0})
df.iloc[:, 0] = df.iloc[:, 0].map({b'OK': 1, b'STOP': 0})

DataFrame.applymap(self, func) # to whole dataFrame

DataFrame.apply(self, func, axis=0, raw=False, result_type=None, args=(), **kwds)

Series.map(self, arg, na_action=None) # argfunction, collections.abc.Mapping subclass or Series

df.iloc[:, 2].map(lambda x: x*x) == df.iloc[:, 2].apply(lambda x: x*x)

2.15. save and load

df.to_pickle('b')
df: pandas.DataFrame = pandas.read_pickle('b')

2.15.1. read_csv

 # Имена переменных
 columns = ['age', 'workclass', 'fnlwgt', 'education', 'education-num',
            'marital-status', 'occupation', 'relationship', 'race', 'sex',
            'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income']
 df = pd.read_csv('adult.data', header=None, names=columns, na_values=' ?')

2.15.2. json

pd.read_json('test_data.txt') - {"Клиент":"customer_3567","Дата_заключения_контракта":"2018-05-12","Дата_закрытия_контракта":"2018-06-13","Плановая_дата_закрытия_контракта":"2018-06-13","Сумма_выдачи_по_контракту":21891},{"Клиент":"customer_39200","Дата_заключения_контракта":"2019-03-29","Дата_закрытия_контракта":"2019-04-05","Плановая_дата_закрытия_контракта":"2019-04-05","Сумма_выдачи_по_контракту":11480},{"Клиент":"customer_26509","Дата_заключения_контракта":"2019-03-29","Дата_закрытия_контракта":"2019-04-30","Плановая_дата_закрытия_контракта":"2019-04-28","Сумма_выдачи_по_контракту":2640},{"Клиент":"customer_26623","Дата_заключения_контракта":"2019-03-06","Дата_закрытия_контракта":"2019-03-29","Плановая_дата_закрытия_контракта":"2019-04-06","Сумма_выдачи_по_контракту":25038},{"Клиент":"customer_14647","Дата_заключения_контракта":"2019-03-29","Дата_закрытия_контракта":"2019-04-15","Плановая_дата_закрытия_контракта":"2019-04-15","Сумма_выдачи_по_контракту":6369},{"Клиент":"customer_29658","Дата_заключения_контракта":"2019-12-05","Плановая_дата_закрытия_контракта":"2019-12-27","Сумма_выдачи_по_контракту":24172},{"Клиент":"customer_37798","Дата_заключения_контракта":"2019-11-18","Дата_закрытия_контракта":"2019-12-05","Плановая_дата_закрытия_контракта":"2019-12-18","Сумма_выдачи_по_контракту":9867},

2.16. NaN

выбрать

  • df.loc[df.index.isnull()]

2.16.1. check

  • df.isnull().values.any() # true or false
  • df.isnull().sum() # кол-во по столбцам
  • df.hasna - # true or false

2.16.2. replace

  • df.dropna(subset=['column_name'], inplace=True)
  • df['col'].fillna(0, inplace=True)

2.16.3. drop

df.dropna(subset=['col1', 'col2'],inplace=True) # remove rows if NaN in col1 or col2 column

2.16.4. get not na

df = df[~df['col'].isna()]

2.16.5. other

# MEAN
from sklearn.preprocessing import Imputer
# Define the values to replce and the strategy of choosing the replacement value
imp = Imputer(missing_values="NaN", strategy="mean")
cols = [1, 13]
df[cols] = imp.fit_transform(applicants[cols])

# REMOVE string -> NaN
applicants[cols] = applicants[cols].apply(pd.to_numeric, errors='coerce')

2.17. Categorical encoding

2.17.1. replace values

df['a'] = df['a'].map({b'OK': 1, b'STOP': 0})

replace date:

def repl_date(df_in: DataFrame):
    df = df_in.copy()  # no side effect
    for i, x in enumerate(df.iloc[0, :]):
        if isinstance(x, date):
            # print(i, type(x))
            cname = df.columns[i]
            df[cname] = df[cname].map(lambda x: x.year)
    return df

2.17.2. label encoding

for c in label_e_columns:
    df[c] = df[c].astype('category').cat.codes

# get  velues before encoding
print(dict(enumerate(df[c].astype('category').cat.categories)))

2.17.3. encode binary

df['income'] = df['income'].map({' <=50K': 0, ' >50K': 1})
df['income'] = df['income'].notnull().astype(int)

2.17.4. onehot encode

df = pd.get_dummies(df, dummy_na=False)  # dummy_na=True for debug

s = pd.Series(list('abca'))
pd.get_dummies(s)
   a  b  c
0  1  0  0
1  0  1  0
2  0  0  1
3  1  0  0

2.18. mem usage

#Great snippet from https://www.kaggle.com/gemartin/load-data-reduce-memory-usage
def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))

    for col in df.columns:
        col_type = df[col].dtype

        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        #else:
        #    df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))

    return df

2.19. rename column

df.columns.str.replace("original_column", "APP_SRC_REF")

may rename several columns!

  • ('doggod', 'god')
  • df.columns.str.replace("god", "war")
  • ('dogwar', 'war')
df.rename(columns={"0":"0col", "1": "1col", 2:"2col", 3:"3col"}, inplace=True)

2.20. delete column

  1. df.drop('education', axis=1, inplace=True)
  2. df.drop(['education', 'fabrication'], axis=1, inplace=True)

or

  • df.drop(columns=['education', 'fabrication'], inplace=True)
  • df.drop(df.iloc[:,1:3], axis=1)
  • del df['education']

2.21. delete row

2.21.1. delete NA

df.dropna(axis='index', subset=['column1'])
for x in ['sd', 'a2']:
  ids = df.index[(df["code"] == x) & (df["something"] == 1)]
  if len(ids) != 0:
     df.drop(ids, inplace=True)

2.21.2. delete values that is in other df column

import pandas as pd
df1 = pd.DataFrame(data = {'col1' : [1, 2, 3, 4, 5, 3],
                           'col2' : [10, 11, 12, 13, 14, 10]})
df2 = pd.DataFrame(data = {'col1' : [1, 2, 3],
                           'col2' : [10, 11, 12]})
print(df1)
print(df2)
df_all = df1.merge(df2.drop_duplicates(), on=['col1','col2'],
                   how='left', indicator=True)
print(df_all)
print(df_all[df_all['_merge'] == 'left_only'])
   col1  col2
0     1    10
1     2    11
2     3    12
3     4    13
4     5    14
5     3    10
   col1  col2
0     1    10
1     2    11
2     3    12
   col1  col2     _merge
0     1    10       both
1     2    11       both
2     3    12       both
3     4    13  left_only
4     5    14  left_only
5     3    10  left_only
   col1  col2     _merge
3     4    13  left_only
4     5    14  left_only
5     3    10  left_only

2.22. type

automatic types

error= {‘ignore’, ‘raise’, ‘coerce’}, default ‘raise’

  • ignore - invalid parsing will return the input
  • coerce - invalid parsing will be set as NaN.

2.22.1. types https://numpy.org/doc/stable/reference/arrays.scalars.html

Pandas dtype Python type NumPy type
object str or mixed string_, unicode_, mixed types
Int64/Int32 int int_, int8, int16, int32, int64, uint8, uint16, uint32, uint64
float64 float float_, float16, float32, float64
bool bool bool_
boolean allow NaN ?
datetime64 NA datetime64[ns]
timedelta[ns] NA NA
category NA NA

2.22.2. Display types

print(df1.dtypes)
categorial_columns = df.select_dtypes(include=["object"]).columns
numerical_columns = df.select_dtypes(exclude=["object"]).columns
print(data[categorial_columns].describe())
# unique
for c in categorial_columns:
   print(c, data[c].unique())

2.22.3. float to int

with NaN

df['col'] = df['col'].round().astype('Int32')

without NaN

  1. drop or fill NaN
  2. df['col'] = df['col'].round().astype(int)

2.22.4. string to date

df['col1'] = pd.to_datetime(df['col1'])
df['Дата рождения клиента'] = pd.to_numeric(2021 - pd.to_datetime(df['Дата рождения клиента']).dt.year).astype('Int32')

2.22.5. Category type

object string to category:

  • .astype("category")

2.23. if a>5 c = True else False

https://datatofish.com/if-condition-in-pandas-dataframe/

df.loc[df['set_of_numbers'] <= 4, 'flag'] = 'True'
df['flag'].fillna(False,inplace=True)

2.24. OTHER USE CASES

2.24.1. dictionary for panda

def list_to_dict(dicts: list) -> dict:
    """
    from [{col1':1, col2':3},
            {col1':2, col2':4}]
    to {'col1': [1, 2], 'col2': [3, 4]}

    :param dicts: list of dicts
    :return: dictionary for pandas
    """

    d = {}  # target {'col1': [1, 2], 'col2': [3, 4]}
    for k in dicts[0].keys():
        d[k] = []

    for x in dicts:
        for k in dicts[0].keys():
            d[k].append(x[k])
    return d

2.24.2. Example from dictionary to onehot

def list_to_dict(dicts: list) -> dict:
    """
    from [{col1':1, col2':3},
            {col1':2, col2':4}]
    to {'col1': [1, 2], 'col2': [3, 4]}

    :param dicts: list of dicts
    :return: dictionary for pandas
    """

    d = {}  # target {'col1': [1, 2], 'col2': [3, 4]}
    for k in dicts[0].keys():
        d[k] = []

    for x in dicts:
        for k in dicts[0].keys():
            d[k].append(x[k])
    return d


def repl_date(df_in: DataFrame):
    df = df_in.copy()  # no side effect
    for i, x in enumerate(df.iloc[0, :]):
        if isinstance(x, date):
            # print(i, type(x))
            cname = df.columns[i]
            df[cname] = df[cname].map(lambda x: x.year)
    return df


def one_hot_p(dicts: list):
    d = list_to_dict(dicts)
    df = pd.DataFrame(d)
    df.iloc[:, 0] = df.iloc[:, 0].map({b'OK': 1, b'STOP': 0})
    df = repl_date(df)
    # print(df.to_string())
    df2 = pd.get_dummies(df)
    return df2

2.24.3. remove meanless columns

df.fillna(0)
for x in df.iloc[:]:
    if df[x].min() == df[x].max():
        del df[x]

2.24.4. Sum two columns containing NaN values

total = df['Jan'] + df['Feb'].fillna(0)

2.24.5. reorder columns

# firest
target = df.pop('first_decision_state')
df.insert(1, 'first_decision_state', target)

# second
cols = df.columns.tolist()
cols = cols[-1:] + cols[:-1] # last to first
df = df[cols]

2.24.6. TODO remove duplicates

  • df.sort_values(by=['id', 'completed_at'], na_position='first')
  • df.drop_duplicates('id', keep='last')

2.24.7. replace missing values by groups

df["value"] = df.groupby("name").transform(lambda x: x.fillna(x.mean()))
df.reset_index(inplace=True, drop=True)
shit_cols = ['pickup_day_of_week', 'geo_cluster', 'events']
shits = []
for shit in shit_cols:
    shits.append(pd.get_dummies(df[shit], prefix=shit, drop_first=True))
    print(pd.get_dummies(df[shit], prefix=shit))

shits = pd.concat(shits, axis=1)
print(shits.head())
print("Сколько бинарных столбцов у вас получилось сгенерировать с помощью однократного кодирования?\n",
      len(shits.columns))
# ['pickup_day_of_week_1', 'pickup_day_of_week_2', 'pickup_day_of_week_3', 'pickup_day_of_week_4', 'pickup_day_of_week_5', 'pickup_day_of_week_6', 'geo_cluster_1', 'geo_cluster_2', 'geo_cluster_3', 'geo_cluster_4', 'geo_cluster_5', 'geo_cluster_6', 'geo_cluster_7', 'geo_cluster_8', 'geo_cluster_9', 'events_None', 'events_Rain', 'events_Snow']
df = pd.concat([df.drop(columns=shit_cols), shits], axis=1)

2.24.8. add count of occurences column

df['count'] = df.groupby('Col1')['Col1'].transform('size')

2.25. troubleshooting

df['binary'][0] = 23

SettingWithCopyWarning: rewrite:

df.loc[0, 'binary'] = 23
df.loc[:, c] = pd.Series([2,3,4,])

3. h5py

emerge dev-python/h5py

Groups work like dictionaries, and datasets work like NumPy arrays.

3.1. Dataset object

datasets support operations:

  • compression
  • error-detection
  • chunked I/O

attributes:

  • shape
  • size
  • ndim
  • dtype
  • nbytes

3.2. terms

datasets
array-like collections of data
groups
folder-like containers that hold datasets and other groups

3.3. open

  • h5py.File() - acts like a Python dictionary

3.4. usage

import h5py

f = h5py.File('mytestfile.hdf5', 'r')

4. DVC

fetch data from external, codify data/models and reproducible pipelines.

4.1. features:

  • allow to download data from supported sources and keep hash of files.
  • versioning through codification - metafiles describing: datasets, ML artifacts, etc. to track.
  • allow to create pipiline, fix input and outputs, allow to avoid reruns.
  • DVCLive tool for experiment tracking
  • allow to create development server with shared and cached data, chached data may be shared between projects.

allow

  • Data validation: for example, validation against a schema or verifying pipeline consistency — correct shapes, data types, etc.
  • Model validation: for example, input/output and performance validation — all dependencies present for inference to run, and model scores within thresholds.

4.2. problem

to track and storing it in Git

  • large datasets
  • machine learning models - binary

4.3. terms

data registry
git + dvc repository - for versioning of data and model files. The data itself is stored in one or more DVC remotes
DVC remotes
similar to Git remotes, used with dvc push and dvc pull commands. To add: dvc remote to .dvc/config.
stage
processing step of pipeline. allow connecting code to its corresponding data input/dependencies and output.
dependencies
input for a stage. specified as paths in the dev field of ".dvc". Stages are invalidated (considered outdated) when any of their dependencies change.
output
result of stage, tracked by DVC.
parameters
granular dependencies of stage, such as "batch size", DVC can track any key/value pair in a supported parameters file (params.yaml by default)
metrics
feature of "experiments" - allow compare results.
cache
hidden storage .dvc/cache

4.4. steps

  • dvc init # running inside a Git project
  • git commit -m "dvc init"

4.4.1. data:

way 1) git source

  • looks like it download file: dvc get https://github.com/iterative/dataset-registry get-started/data.xml -o data/data.xml
  • dvc add to start tracking the dataset file. create: data/data.xml.dvc. Same to git add.
  • git add data/data.xml.dvc data/.gitignore
  • git commit -m "Add raw data"

way 2) local directory

  • mkdir /tmp/dvcstore
  • dvc remote add -d myremote /tmp/dvcstore

Now we have

  1. file data/data.xml
  2. in .gitignore record for this file
  3. data/data.xml.dvc - hash

dvc checkout to sync data into your workspace

4.4.2. pipelines

abstract:

  1. virtualenv venv && echo "venv" > .gitignore
  2. source venv/bin/activate
  3. pip install -r src/requirements.txt

actual: .4) Create stage:

dvc stage add -n prepare \
                -p prepare.seed,prepare.split \
                -d src/prepare.py -d data/data.xml \
                -o data/prepared \
                python src/prepare.py data/data.xml

generate dvc.yaml file, it have:

  • command that will be run: python src/prepare.py data/data.xml
  • -d - for dependencies
  • -o - output
  • -p - parameter, such as "batch size"

.5) dvc repro - run the pipeline. dvc.lock (a "state file") was created to capture the reproduction's results, that should be added to git.

  • automatically determines which parts of a project need to be run

.6) we can use dvc stage add -d data/prepared - to create chain.

.7) dvc dag - visualize chain of stages .8) dvc params diff - show differences between iterations of pipeline. also there is metrcis diff and ptots diff

4.5. CML - Continuous Machine Learning

orchestration, testing and monitoring.

  • manage ML experiments, track who trained ML models or modified data and when.
  • Auto-generate reports with metrics and plots
  • Build your own ML platform using just GitHub or GitLab and your favorite cloud services: AWS, Azure, GCP, or Kubernetes. No databases, services or complex setup needed.

links

5. matplotlib

5.1. base

ax: Axes = None
fig, ax = plt.subplots(1,1, figsize=(19,10))
plt.subplots_adjust(left=0.076, right=0.96, bottom=0.04, top=0.96, wspace=0.30, hspace=0.7) # if more than one
plt.plot(.., legend='line1')
title="graph"
fig.suptitle('test title', fontsize=20)
plt.suptitle('test title', fontsize=20) #?
plt.title('Title!', {'fontsize':20})
plt.rc('font', size=6) # set font size
plt.legend() # add line descriptions
fig.subplots_adjust(left=0.4, bottom=0.4)
plt.tight_layout() # corret top, left, bottom, right automatic
plt.show() # or plt.savefig('name')
plt.savefig(title)
# horizontal line
plt.axhline(y = 2, color = 'r', linestyle = 'dashed', label = "red line")
# vertical line
plt.axvline(x = 7, color = 'b', label = 'axvline - full height')

plt.close()

plt.yticks(range(1,10)) # шкала слева
as.set_xlim(left=3) # шкалировать от 3

5.2. subplot or multiple diagram in one window

import matplotlib.pyplot as plt
fig = plt.figure(figsize=(2,2))
d1: AxesSubplot = fig.add_subplot(1, 2, 1)   #1 row 2 columns - left
d2: AxesSubplot = fig.add_subplot(2, 2, 2)   #2x2 - top right
d3: AxesSubplot = fig.add_subplot(2, 2, 4)   #2x2 - bottom right
plt.show()

d: AxesSubplot = fig.add_subplot(121)   # equal to 1, 2, 1

fig.tight_layout() # create spaces to allow set_title for graphics

# -- define grid more precisely with rations
# gs = fig.add_gridspec(nrows=2, ncols=2,
#                       width_ratios=((1,)), # ncols length
#                       height_ratios=(1,1), # nrows
#                       left=0.1, right=0.1, bottom=0.1, top=0.9,
#                       wspace=0.1, hspace=0.1)
# ax = fig.add_subplot(gs[1, 0])
# ax.hist(x, bins=bins1)

5.3. x axis labels range

import matplotlib.ticker as plticker
loc = plticker.MultipleLocator(base=50)
ax.xaxis.set_major_locator(loc)

5.4. Matplotlib is currently using agg, which is a non-GUI backend, so cannot show the figure.

matplotlib.use

5.4.1. TkAgg

import matplotlib
matplotlib.use('TkAgg')

Tkinter is a Python binding to the Tk GUI toolkit. It is the standard Python interface to the Tk GUI toolkit, and is Python's de facto standard GUI.

Gentoo: included with standard Linux

Gentoo: USE="tk"

5.4.2. GTK3Agg

Xfce4 - GTK-based

  • find out GTK version: dpkg -l libgtk* | grep -e '^i' | grep -e 'libgtk-*[0-9]'
  • find out glib version: ldd –version
  • apt install libglib2.0-dev
  • apt install libgirepository1.0-dev
  • apt install libcairo2-dev
  • apt install python3-dev
  • pip install pycairo
  • apt-get install libgtk-3-dev
  • pip3 install PyGObject –user
import matplotlib
matplotlib.use('GTK3Agg')

5.5. usage

from matplotlib import pyplot as plt

# time sequence
plt.plot(range(len(a)), a)
plt.show()

# time sequence - голубыми Точками
plt.plot(range(len(a)), a, 'bo')
plt.show()

# Histogram - distribution of numerical data
# бакет - дискретный интервал разбиения
N = 100
noise = np.random.normal(loc=0.0, scale=1.0, size=(N, 1))
plt.hist(noise, bins='auto', density=True)
plt.show()


# Scatter - y=f(x) в виде точек, где x не по порядку.
plt.scatter(x_np, y_rows)
plt.show()

# В виде линии
res = sorted(zip(x_np,y_rows) , key=lambda k: k[0]) # сортируем по x
x, y = zip(*res) # unzip
plt.plot(x, y)
plt.show()


#matr_my - shape=(50,512) - value=[0;1] в виде спектра.

plt.pcolormesh(matr_my, cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()

5.6. do not close

plt.close()
plt.plot()
plt.draw()
plt.pause(0.0001)

5.7. Multiple Curves

import matplotlib.pyplot as plt
x = [0,1,2,3,4]
y1 = [2,3,5,7,8]
y2 = [2, 3, 7, 7, 8]
plt.plot(x, y1, label = "1")
plt.plot(x, y2, label = "2")
plt.show()

5.8. two windows with separate legend

x = [0, 1, 2, 3, 4]
y1 = [2, 3, 5, 7, 5]
y2 = [2, 3, 7, 7, 8]

import matplotlib.pyplot as plt
plt.figure()
ax = plt.gca()
plt.plot(x, y1, label="1")
plt.plot(x, y2, label="2")

plt.figure()
plt.plot(x)
plt.figlegend(*ax.get_legend_handles_labels(), loc='upper left')
plt.show()

5.9. custom histogram

# get hist
counts, edges = np.histogram(A, bins=10, range=(0,10))

bincenters = 0.5 * (edges[1:] + edges[:-1])
spline = make_interp_spline(bincenters, counts, k=k)

# that is how to loop edges
for pair in zip(binEdges[:-1], binEdges[1:]):
        low, high = pair

# back to data
A = np.repeat(edges[:-1], counts)

5.10. rotate x ticks

plt.xticks(rotation=10)

5.11. CASES

5.11.1. TODO bar plot with two y axes

5.11.2. varible in time

plt.plot_date(df['date'],df['x])
plt.show

6. SciPy

adds more MATLAB-like functionality and Matplotlib is a plotting package that provides MATLAB-like plotting functionality

6.1. hierarchical lustering

6.1.1. distance and squareform

pdist - Pairwise distances between observations

>> array([0., 2., 2.])

squarefor - returns a symmetric matrix where Z(i,j) corresponds to the pairwise distance between observations i and j

dist:

from scipy.spatial.distance import squareform
from scipy.spatial.distance import pdist
d = pdist([[1,2],[1,2], [3,2]])
print(d)
print()
sq = squareform(d)
print(sq)

here: [0. 0. 2.] (1) - distances between first observation and first, second, third observation

6.1.2. linkage

[print(i+len(df), x) for i, x in enumerate(l)]

At the i-th iteration, clusters with indices Z[i, 0] and Z[i, 1] are combined to form cluster n + i.

  • i-th row - iteration
  • 0 and 1 - cluster numbers or observation number if x<n
  • 2 - is a distance between 0 and 1
  • Z[i, 3] represents the number of original observations in the newly formed cluster

format:

6.1.3. dendrogram

to see count of observatins in clusters - set truncate_mode='level' and p=1.1 to level.

from matplotlib import pyplot as plt
dendrogram(Z=l, p=1.1, truncate_mode='level', labels=df.index, count_sort=False, distance_sort=False, orientation='right', leaf_font_size=15)
plt.show()

6.1.4. cophentic correlation

pearson correlation

7. Scikit-learn

  • based on numpy and SciPy
  • scikit-learn can be classified as a tool in the "Machine Learning Tools" category, while SciPy is grouped under "Data Science Tools".

7.1. history

  • 2007 begin
  • 2010 first release

7.2. fast feature selection

from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_regression # or chi
selector = SelectKBest(f_regression, k=25)
X_new = selector.fit_transform(X, y)
names = X.columns.values[selector.get_support()]
scores = selector.scores_[selector.get_support()]
names_scores = list(zip(names, scores))
print("Укажите признаки, которые вошли в список отобранных:")
[print(x) for x in names_scores]

7.3. sklearn.tree.DecisionTreeClassifier

  1. the algorithm chooses a feature and makes a split
  2. looks at the subsets and measures their impurity using the (gini,entropy) score (impurity)
  3. for multiple thresholds and determines that the best split for the given feature
  4. repeat for all features and nodes
  5. from root to leaves

7.3.1. usage

test = 0  # matrix.shape[0] // 3
train = int(matrix.shape[0] - test)

data_train = matrix[:train, 1:].copy()  # 11 column - labels
labels_train = matrix[:train, 0].copy()  # 11 column - labels
# print(labels_train)
data_test = matrix[train:, 1:].copy()  # 11 column - labels
labels_test = matrix[train:, 0].copy()  # 11 column - labels

print(data_train.shape)
print(data_test.shape)
print(labels_train.shape)

models = []
# DecisionTreeClassifier ------------------------------
from sklearn.tree import DecisionTreeClassifier

data_train[np.isnan(data_train)] = -1  # replace nan
data_train_orig = data_train.copy()

model = DecisionTreeClassifier(random_state=42,
                                   # функция для impurity ('gini' или 'entropy')
                                   criterion='gini',
                                   # максимальная глубина дерева
                                   max_depth=3,
                                   # минимальное число элементов в узле для разбиения (может быть долей)
                                   min_samples_split=5,
                                   # минимальное число элементов в листе (может быть долей)
                                   min_samples_leaf=2,
                                   # минимальное значение дельты impurity
                                   # min_impurity_decrease=0,
                                   # веса для классов (можно дополнительно штрафовать за ошибку в нужных классах).
                                   # поддерживает опцию 'balanced'.
                                   class_weight=None,
                                   # предварительная сортировка.
                                   # ускоряет обучение на данных небольшого размера или с ограниченной глубиной дерева.
                                   # иначе замедляет обучение.
                                   presort=False
                                   )

    # Обучаем модель
    data_train[np.isnan(data_train)] = -1
    model.fit(data_train, labels_train)

    # delete feature
    parent_feature = model.feature_importances_.argmax()  # 0...
    print(parent_feature)
    data_train[:, parent_feature] = np.zeros(data_train.shape[0])  # (0...

    from IPython.display import Image
    from sklearn.tree import export_graphviz
    from subprocess import call

    export_graphviz(model,
                    out_file='tree.dot',
                    # задать названия фич
                    # feature_names=X.columns,
                    class_names=None,
                    # показывать названия полей у численных значений внутри узла
                    label='all',
                    # раскрашивать узлы в цвет преобладающего класса
                    filled=True,
                    # показывать значение impurity для каждого узла
                    impurity=True,
                    # показывать номера узлов
                    node_ids=True,
                    # Показывать доли каждого класса в узлах (а не количество)
                    proportion=True,
                    # Повернуть дерево на 90 градусов (вертикальная ориентация)
                    rotate=False,
                    # Число точек после запятой для отображаемых дробей
                    # precision=3
                    )

    # Преобразуем файл tree.dot в tree.png
    call(['dot', '-Tpng', 'tree.dot', '-o', 'tree.png'])
    # Вставляем картинку в блокнот
    # Image("tree.png")

    # data_test[np.isnan(data_test)] = -1
    test_result = model.predict(data_train_orig)

    # RESULT
    auc = sklearn.metrics.roc_auc_score(labels_test, test_result)

    gini = 2 * auc - 1

7.4. Tuning the hyper-parameters https://scikit-learn.org/stable/modules/grid_search.html

  • GridSearchCV - Exhaustive Grid Search, all parameter combinations
    • HalvingGridSearchCV - evaluating all the candidates with a small amount of resources and iteratively selects the best candidates, using more and more resources.
  • RandomizedSearchCV - given number of candidates
    • HalvingRandomSearchCV -

SH is an iterative selection process where all candidates (the parameter combinations) are evaluated with a small amount of resources at the first iteration. the resource is

  • the number of training samples
  • arbitrary numeric parameter such as n_estimators in a random forest.

parameters

  • factor (> 1) - each iteration, the number of resources per candidate is multiplied, candidates is divided

(3 usually works well)

  • HalvingRandomSearchCV: aggressive_elimination=True can also be used if the number of available resources is small.

RandomizedSearchCV vs GridSearchCV https://analyticsindiamag.com/why-is-random-search-better-than-grid-search-for-machine-learning/

7.5. feature importance

from sklearn.ensemble import GradientBoostingRegressor
dt = GradientBoostingRegressor()
indices = np.argsort(dt.feature_importances_)[::-1]  # sort indexes
print(indices)
for i in range(len(X_column_names)):  # первые 100
    print("%d. %s (%f)" % (i + 1, X_column_names[indices[i]], dt.feature_importances_[indices[i]] / 100))

7.6. Encoders - sklearn.preprocessing.*

  • OrdinalEncoder
  • OneHotEncoder -
    • min_frequency=0.5 - all values that have < min_frequency will be as 'others' column
  • TargetEncoder - target mean with the target mean conditioned on the value of the category, good for features with high cordinality and hight correlation with target. Shuffle by default, use internal cross-fitting.

7.7. suppress warnings

import warnings
warnings.filterwarnings("ignore", category=Warning)
from sklearn.metrics import precision_score
y_true = [0, 1, 2, 0, 1, 2]
y_pred = [0, 2, 0, 0, 0, 0]
print(precision_score(y_true, y_pred, average='macro'))
0.13333333333333333

8. TODO statsmodels

used in econometrics, generalised-linear models, time-series-analysis, statistical hypothesis testing, and regression models for "rigorous statistics", for explanatory analysis

9. TODO RAPIDS

GPU accelerated data science

10. TensorFlow (TF)

Apache 2.0

  • разработанная компанией Google
  • used for machine learning applications such as neural networks
  • Создается вычислительный граф. - Графовый фреймворк

‐ Cleverhans - фреймворд чтобы атаковать и защищать модели??

  • Lucid - визуализировать
  • define computation graph - позволяет автоматическое дифференцирование
    • Nodes - operators, varibles, constants
    • Edges - tensors

10.1. history

2.4.0

  • MultiWorkerMirroredStrategy - no longer experimental
  • TensorFlow Profiler now supports profiling `MultiWorkerMirroredStrategy`

10.2. terms

batch
weights and biases are only updated after all of the inputs and targets are presented
epoch
is one single pass over the entire training set
train_step
function that is called by fit() for every batch of data. Execute Forward pass with tf.GradientTape(). Return a dict mapping metric names to current value.
Operations (Ops)
high level operation on Tensor.
Kernel
implementation of an op tied to specific hardware/platform. Some ops have a one-to-one mapping from op to kernel while other ops use multiple kernels.
Gradient / GradFunc
The ‘backward mode’ definition of an op/kernel that computes the derivative of that function with regards to some input.

10.3. Features:

  • Stable
  • Well-documented sources
  • Flexibility
  • Portability
  • Scalability
  • Popularity

Cons:

  • Невозможно обучать распределенно
  • Метрический тензор нельзя запрограммировать

10.4. hello world

import tensorflow as tf
import timeit

# -- set device manually
try:
    gpus = tf.config.experimental.list_physical_devices('GPU')
    tf.config.set_visible_devices(gpus[0], 'GPU')
    logical_gpus = tf.config.list_logical_devices('GPU')
except RuntimeError as e:
    print(e)

# -- eager execution
# Note:  steps through all of the program operations, needed or not.
a = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], trainable=False)
b = tf.Variable([[1.0, 2.0, 3.0]], trainable=False)
k = a * b
print(k)
# -- graph execution
# Note: graph execution enables portability outside Python and tends to offer better performance
# consist of: tf.Operation objects, which represent units of computation; and tf.Tensor objects, which represent the units of data that flow between operations
# using graph directly is depricated
# Graph execution only executes the operations necessary to produce the observable effects, which includes:  "Non-strict execution"

x = tf.random.uniform(shape=[10, 10], minval=-1, maxval=2, dtype=tf.dtypes.int32)

def power(x, y):
  result = tf.eye(10, dtype=tf.dtypes.int32)
  for _ in range(y):
    result = tf.matmul(x, result)
  return result

print("Eager execution:", timeit.timeit(lambda: power(x, 100), number=1000), "seconds")

power_as_graph = tf.function(power)
print("Graph execution:", timeit.timeit(lambda: power_as_graph(x, 100), number=1000), "seconds")

10.5. deployment

  • TensorFlow Serving - models on servers, be them in-house or on the cloud, and is used within the TensorFlow Extended (TFX) end-to-end Machine Learning platform.
    • deploy with static API.
    • tightly integrated with Google Cloud via Vertex AI and integrates with Kubernetes and Docker.
    • Android and iOS, as well as microcontrollers (ARM with Bazel or CMake) and embedded Linux (e.g. a Coral device)
  • TensorFlow Lite - on mobile or IoT/embedded devices

TFLite addresses 5 constraints for on-device Artificial Intelligence:

  • latency, connectivity, privacy, size, and power consumption

10.6. ecosystem

10.7. layours

  • tf.Module - is the base class for both tf.keras.layers.Layer and tf.keras.Model
  • tf.keras.layers.Layer
  • tf.keras.Model

10.8. Eager vs Grapth execution

Eager

  • evaluate operations immediately
  • do not build graphs
  • operations return actual values instead of graphs to run later

Graph @tf.function, tf.Graph

  • to accelerate your models.
  • Graph - set of tf.Operation objects, which represent units of computation; and tf.Tensor objects, which represent the units of data that flow between operations.
  • can be saved, run, and restored all without the original Python code.
  • By default, Model.fit() we will attempt to compile your model to a static graph

10.9. TF 2.0

10.9.1. tf.GradientTape API

for automatic differentiation using "reverse mode differentiation"

  • resources held by a GradientTape are released as soon as GradientTape.gradient() method is called
  • Trainable variables (created by tf.Variable or tf.compat.v1.get_variable, where trainable=True is default in both cases) are automatically watched.
  • at least one of inputs is being "watched".
with tf.GradientTape() as g:
  g.watch(x)
  y = x * x
dy_dx = g.gradient(y, x)

10.9.2. tf.function

TensorFlow graphs require static dtypes and shape dimensions. tf.function keeps a cache of concrete functions generated by tracing.

trace_cache_key as function of datatype and shape of every Tensor argument and tf.device() scope. For a Python primitive is its value. Key is used to determine if a new graph needs to be created or if a previously created graph can be invoked.

Nones:

  • Can only use Tensors arguments.
  • runs all stateful operations (e.g. tf.print)

Argumets must be either:

  • Tensor (ndarrays are converted to the equivalent Tensor)
  • list of Tensor
  • arbitrary Python value

The main takeaways and recommendations are:

  • Don't rely on Python side effects like object mutation or list appends.
  • tf.function works best with TensorFlow ops, rather than NumPy ops or Python primitives.
  • When in doubt, use the for x in y idiom.
  1. wrap function

    https://www.tensorflow.org/api_docs/python/tf/compat/v1/wrap_function

    tf.compat.v1.wrap_function

    • do not runs all stateful operations (e.g. tf.cond)
    • only trace the Python function once
    from tensorflow_core.python.eager.wrap_function import WrappedFunction, VariableHolder, wrap_function
    wf:WrappedFunction = wrap_function(f)
    
    class WrappedFunction(function.ConcreteFunction):
    """Callable object encapsulating a function definition and its gradient.
    
  2. AutoGraph включен в tf.function

    для преобразования if и for в tf.cond и tf.while.

10.9.3. migrate 1 to 2

import tensorflow.compat.v1 as tf tf.disable_v2_behavior()

  • Eager execution, v1.enable_eager_execution() - tf.Graph will fail - wrap this code in a with tf.Graph().as_default() context.
  • Resource variables, v1.enable_resource_variables() - 2.0 Resource variables are locked while being written to
  • Tensor shapes, v1.enable_v2_tensorshape() - t.shape[0].value will fail
  • Control flow, v1.enable_control_flow_v2()

10.9.4. custome layer

Custom layers

Methods:

  • __init__()
  • build()`: Called once from `__call__`, when we know the shapes of inputs and `dtype`.
  • call()

Arguments __init__():

trainable
Boolean, whether the layer's variables should be trainable.
name
String name of the layer.
dtype
The dtype of the layer's computations and weights (default of `None` means use `tf.keras.backend.floatx` in TensorFlow 2, or the type of the first input in TensorFlow 1).
dynamic
Set this to `True` if your layer should only be run eagerly, and should not be used to generate a static computation graph. This would be the case for a Tree-RNN or a recursive network, for example, or generally for any layer that manipulates tensors using Python control flow. If `False`, we assume that the layer can safely be used to generate a static computation graph.
class Linear(layers.Layer):

  def __init__(self, units=32):
    super(Linear, self).__init__()
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True) # все self переменные попадают в model.variables  автоматически
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

10.9.5. decayed learning rate

optimizer = SGD(learning_rate=0.006, decay=0.003, momentum=0.3)
lr = optimizer._decayed_lr(tf.float32)
print("lr: %f" % lr)

10.10. Save a model

API

  • tf.compat.v1.train.Saver - binary format. Not-object-based
    • my_test_model-1000.index
    • my_test_model-1000.meta
    • my_test_model-1000.data-00000-of-00001
    • checkpoint - keeps a record of latest checkpoint files saved
  • tf.keras.Model
  • tf.compat.v2.train.Checkpoint - binary object-based checkpoints

10.10.1. v1 Saver loading:

steps

  1. with tf.compat.v1.Session() as sess: or tf.compat.v1.Session()
  2. saver = tf.compat.v1.train.import_meta_graph('my_test_model-1000.meta') # this will create the graph/network for you but we still need to load the value of the parameters that we had trained on this graph
  3. saver.restore(sess,tf.train.latest_checkpoint('./')) # restore the parameters of the network
  4. print(sess.run('w1:0')) - print saved value of w1.

Run:

graph = tf.compat.v1.get_default_graph()
w1 = graph.get_tensor_by_name("w1:0")
w2 = graph.get_tensor_by_name("w2:0")
feed_dict ={w1:13.0,w2:17.0}
op_to_restore = graph.get_tensor_by_name("op_to_restore:0")
 print sess.run(op_to_restore,feed_dict)

10.10.2. v2 saving loading

  • Checkpoints - exact value of all parameters (tf.Variable) - source code required
    • tf.keras.Model.save_weights(path/mymodel)
  • Model.save(path) - the parameter values && serialized description of the computation defined by the model. Source code not needed.

10.11. datasets

  1. tf.keras.datasets: https://www.tensorflow.org/api_docs/python/tf/keras/datasets
    • boston_housing module
    • cifar10 module
    • cifar100 module
    • fashion_mnist module
    • imdb module
    • mnist module
    • reuters module
  2. tensorflow_datasets

tfds.load is a thin wrapper around tfds.core.DatasetBuilder

10.11.1. install and use tfds

pip install tensorflow-datasets
import tensorflow_datasets as tfds
tfds.display_progress_bar(True)

# 1) easy way
ds = tfds.load('mnist', split='train', shuffle_files=True)
assert isinstance(ds, tf.data.Dataset)

10.11.2. download

# create directory required
from pathlib import Path
Path("/mnt/ssd/datasets/tensorflow_datasets/downloads/manual").mkdir(parents=True, exist_ok=True)

# test
# tfds.load('mnist', data_dir="/mnt/ssd/datasets/tensorflow_datasets")

import tensorflow_datasets as tfds
tfds.display_progress_bar(True)
# do not download 'robotics:mt_opt_rlds' and  'huggingface:wmt19'
l = [x for x in sorted(tfds.list_builders()) if ":" not in x ]
errors=[]
for x in l:
    try:
        ds = tfds.load(x, data_dir="/mnt/ssd/datasets/tensorflow_datasets")
    except Exception as e:
        errors.append(x)
print("datasets with errors:", errors)

10.11.3. landmark 2020

Number of unique landmark_id: 81313

import os
import pandas as pd
import tensorflow as tf
import numpy as np
# ------- data
def get_paths(path="/landmark-retrieval-2020/train", max_count=-1):
    index = ["0","1","2","3","4","5","6","7","8","9","a","b","c","d","e","f"]
    paths = []
    for a in index:
        for b in index:
            for c in index:
                paths.extend([path+f"/{a}/{b}/{c}/" + x for x in os.listdir(path+f"/{a}/{b}/{c}")])
        if max_count > 0 and len(paths) > max_count:
            break
    return paths

paths = get_paths("/landmark-retrieval-2020/train", 100)
df = pd.read_csv("/landmark-retrieval-2020/train.csv") # count 1580470 # id  landmark_id
mapping = {}
for path in paths:
    mapping[path.split('/')[-1].split('.')[0]] = path

df['path'] = df['id'].map(mapping) # add path column
df = df[~ df.path.isna()] # select records with "path" column
# - add probability for ...
alpha=0.6
counts_map = dict(df.groupby('landmark_id')['path'].agg(lambda x: len(x)))
df['counts'] = df['landmark_id'].map(counts_map)
df['prob'] = (  (1/df.counts**alpha) / (1/df.counts**alpha).max()).astype(np.float32) # ?

uniques = df['landmark_id'].unique() # unique classes
df['label'] = df['landmark_id'].map(dict(zip(uniques, range(len(uniques))))) # scale landmark_id to 0-

image_paths, labels, probs = df.path.to_numpy(), df.label.to_numpy(), df.prob.to_numpy()


def split_data(images, labels, train_size=0.9, shuffle=True):
    """ not stratified, train will have not all classes """
    # 1. Get the total size of the dataset
    size = len(images)
    # 2. Make an indices array and shuffle it, if required
    indices = np.arange(size)
    if shuffle:
        np.random.shuffle(indices)
    # 3. Get the size of training samples
    train_samples = int(size * train_size)
    # 4. Split data into training and validation sets
    x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
    x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]]
    return x_train, x_valid, y_train, y_valid

x_train, x_valid, y_train, y_valid = split_data(image_paths, labels)


# --------- dataset class
img_width = 736
img_height = 736

def encode_single_sample(img_path, label):
    print(img_path, label)
    # 1. Read image
    img = tf.io.read_file(img_path)
    # 2. Decode and convert to grayscale
    img = tf.io.decode_jpeg(img, channels=3)
    # 3. Convert to float32 in [0, 1] range
    img = tf.image.convert_image_dtype(img, tf.float32)
    # 4. Resize to the desired size
    img = tf.image.resize(img, [img_height, img_width])
    # 5. Transpose the image because we want the time
    # dimension to correspond to the width of the image.
    img = tf.transpose(img, perm=[1, 0, 2])
    # 7. Return a dict as our model is expecting two inputs
    return {"image": img, "label": label}


train_dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(str), y_train.astype(int)))
train_dataset = train_dataset.map(encode_single_sample)
valid_dataset = tf.data.Dataset.from_tensor_slices((x_valid.astype(str), y_valid.astype(int)))
valid_dataset = valid_dataset.map(encode_single_sample)
# dataset = dataset.map(
#         lambda x, y, p: (read_image(x), y, p),
#         tf.data.experimental.AUTOTUNE)

# # anotehr approach:
# train_list = glob.glob('../input/landmark-retrieval-2020/train/*/*/*/*')
# test_list = glob.glob('../input/landmark-retrieval-2020/test/*/*/*/*')
# index_list = glob.glob('../input/landmark-retrieval-2020/index/*/*/*/*')

if __name__=="__main__":
    args = sys.argv[1:]
    print('args', args)
    main(args)

10.11.4. mnist

import tensorflow as tf

def encode_single_sample(img_path, label):
    tf.io.read_file(image_path)
    tf.image.decode_jpeg(image, channels=3)

mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
# -- dataset
batch_size=16
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
map(
        encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE
    )
train_dataset = train_dataset.shuffle(60000).repeat().batch(batch_size)
validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
# -- train
model.fit(train_dataset, epochs=5, steps_per_epoch=200)

10.12. tf.data.dataset

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))

dataset must consist of typeles - (x, y) by default, but it may be dictionary

10.12.1. test

for elem in train_dataset_y.take(10):
    print(elem.numpy().shape)
    # or
    print(elem['label'].numpy().shape)

print(train_dataset.__iter__().next())

10.13. install

see Tested build configurations tensorflow.org/install/source#linux

  1. apt clean; apt updatel apt purge cuda ; apt purge nvidia-*; apt autoremoveq
  2. install "cuda toolkit" from archive
  3. pip3 install tensorflow-gpu==2.3.0

10.14. install from source

Для компиляции tensorflow используется гугловая система сборки Bazel

10.15. APIs

  1. tf.nn - very low level
  2. tf.layers - higher
  3. tf.keras - highest
  4. просто сразу вычисляет tf.enable_eager_execution()

10.16. tf.placeholder

amy = placeholder - это тензоры в графе, которым присваивается имя amy

sess.run([tensors], feed_dict={amy: 1})  # заполняет placeholders and выполняет тензоры

10.17. Logger = Disable

import os
import tensorflow as tf
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

10.18. 4D tensor

  • N refers to the number of images in a batch.
  • H refers to the number of pixels in the vertical (height) dimension.
  • W refers to the number of pixels in the horizontal (width) dimension.
  • C refers to the channels. For example, 1 for black and white or grayscale and 3 for RGB.

Formats:

  • NCHW or channels_first - optimal for NVIDIA GPUs cuDNN - If not using the Intel MKL, some operations are not supported on CPU when using NCHW
  • NHWC or channels_last - TensorFlow default - little faster on CPU - we are working on tools to auto rewrite graphs to make switching between the formats transparent and take advantages of micro optimizations where a GPU op may be faster using NHWC

channels_last - default for keras

10.19. install

  • pip install tensorflow –user
  • import tensorflow as tf
  • tf.InteractiveSession()

10.20. Deploy

  • Java
  • C
  • Go

10.21. tensor

  • https://www.tensorflow.org/guide/tensors
  • Tensor a mathematical object analogous to but more general than a vector, represented by an array of components that are functions of the coordinates of a space
  • unit of data, geometric objects that describe linear relations between geometric vectors, scalars, and other

tensors

  • has Rank
  • set of primitive values shaped into an array of any number of dimensions
  • rank/dimension zero tensor - 5 - scalar - shape is []
  • rank/dimension 1 tensor - [ 1., 2., 3., 4. ] - Vector - shape is [4]
  • rank/dimension 2 tensor or a Matrix - shape [ 2, 4] - [ [ 1., 2., 3., 4. ], [ 5., 6., 7., 8. ] ]

Граф состоит из узлов op, связанных друг с другом, представляющих операции.

  • Операция выделяет память для своих выходов, которые доступны в конечных точках :0, :1 и т.д. - похожих на тензор

10.22. hardware

GPU могу ускорить работу сети в 10-20 раз[1]

CPU

  • С достаточно мощной видеокартой мощность процессора практически не важна, потому что всю нагрузку возмет GPU
  • желательно Intel® Xeon®, Intel® Xeon Phi™
  • если 2 видеокарты, то процессор должен их поддерживать.

GPU

  • две GPU лучше чем одна на 20%. Переносимость модели на систему без GPU реализована.
  • CUDA-Enabled NVIDIA video vard https://developer.nvidia.com/cuda-gpus
  • Deep Learning Primitives (cuDNN) - part of Deep Learning SDK, requires CUDA Toolkit
  • GPU Memory >=11 GB - больше лучше
  • чем больше FLOPS тем лучше
  • топы: NVIDIA QUADRO® GV100 или NVIDIA TITAN RTX
  • GPU Cooling - очень важен - Air cooling - для одного или двух если между ними поместится ещё две

RAM

  • RAM clock rates not required
  • RAM size больше чем GPU Memory одной из карт - больше памяти, удобнее работа для человека.

PSU if you have 4 GPUs with each 250 watts TDP and a CPU with 150 watts TDP, then you will need a PSU with a minimum of 4×250 + 150 + 100 = 1250 watts

Quandro P1000 PCE-3.0 кабинет 42 Соловьев

Счет на Кирила скинуть, Артем сказал скинуть счет на оплату, с Минофьевым согласовали, отправить в москву.

Андрей Свиридов поговорил с ЦФТ о возможности получить тестовый доступ к их облачному сервису расладвающему назначения на компоненты.

Почтовый ящик с заявками, текст и сканы.

  1. TensorFlow для глубокого обучения. Барат Рамсундар, Реза Босаг Заде. 2019г.
  2. https://timdettmers.com/2018/12/16/deep-learning-hardware-guide/
  1. проверить материнскую плату что она PCI-E 3.0
  1. Написать письмо
  2. Можно ли с keras использовать несколько GPU
  3. прочитать по автокредиту что прислал в почте, посмотреть бизнес процессы
  4. читать банковское дело.
  5. Военкомат!! 11:00

https://www.ferra.ru/review/computers/nvidia-geforce-gtx-1070-asus-gigabyte-msi-palit-zotac.htm

Выбор видеокарты PALIT GeForce GTX 1070 27030р - 29000р

GeForce RTX 2060

Железо 50 70

10.23. hello world

import tensorflow as tf
a = tf.add(3, 5)

sess = tf.Session()
print sess.run(a)
sess.close()
# or
with tf.Session() as sess:
  print sess.run(a)

10.24. main objects

  • tf.Session - содержит один глобальный граф
    • tf.InteractiveSession - makes itself the default
  • tf.Tensor
    • tf.constant(value, dtype=None, shape=None, name='Const', verify_shape=False)
      • stored in the graph definition
      • loading graphs expensive
  • tf.placeholder - input for graph

when constants are big

  • tf.Operation
  • tf.Graph - состоит из экземпляров tf.Tensor и tf.Operation.
    • Multiple graphs require multiple sessions, each will try to use all available resources by default
    • Can't pass data between them without passing them through python/numpy, which doesn't work in distributed
    • It’s better to have disconnected subgraphs within one graph
  • data types
    • tf.int32
    • tf.float32
    • tf.float64
    • tf.string
    • tf.bool

10.25. Переменные

  • tf.Varible - контейнер Tensor
  • tf.assign

Инициализация

init = tf.global_variables_initializer()
with tf.Session() as sess:
  sess.run(init)

nitialize a single variable

W = tf.Variable(tf.zeros([784,10]))
with tf.Session() as sess:
  sess.run(W.initializer)
  print W.eval()

10.26. TensorBoard

2 run it:

1 save it:

import tensorflow as tf
a = tf.constant(2, name="a")
b = tf.constant(3, name="b")
x = tf.add(a, b, name="add")
with tf.Session() as sess:
  # add this line to use TensorBoard.
  writer = tf.summary.FileWriter('./graphs, sess.graph)
  print sess.run(x)
writer.close() # close the writer when you’re done using

10.27. GPU

https://www.tensorflow.org/install/gpu

  • pip3 install tensorflow-gpu –user

Required:

  • import tensorflow as tf
  • config = tf.ConfigProto()
  • config.gpu_options.allow_growth = True
  • session = tf.Session(config=config)

10.28. keras

from tensorflow import keras
from tensorflow.python.keras.api._v2.keras.layers import BatchNormalization, Dense, Dropout, Activation, Flatten, \
    Conv2D, MaxPooling2D
from tensorflow.python.keras.api._v2.keras.models import Sequential

10.29. CNN

tf.nn.conv2d(feat,

  • weight, - input
  • strides=[1,1,1,1], - 1,2 or 4 - stride of the sliding window for each dimension of input
  • padding="VALID")+bias

tf.nn.max_pool(feat,

  • ksize=[1,2,2,1] - window per every dimension
  • strides=[1,2,2,1]
  • padding="VALID")

10.30. RNN and LSTM

10.30.2. batch

https://machinelearningmastery.com/stateful-stateless-lstm-time-series-forecasting-python/

You can set RNN layers to be 'stateful', which means that the states computed for the samples in one batch will be reused as initial states for the samples in the next batch. This assumes a one-to-one mapping between samples in different successive batches.

You can specify the initial state of RNN layers symbolically by calling them with the keyword argument initial_state. The value of initial_state should be a tensor or list of tensors representing the initial state of the RNN layer.

You can specify the initial state of RNN layers numerically by calling reset_states with the keyword argument states. The value of states should be a numpy array or list of numpy arrays representing the initial state of the RNN layer.

it may be possible to simulate a stateful LSTM with a stateless LSTM using a large batch size.

10.31. plot learning curve

print(history.history.keys()) # ['loss', 'acc', 'val_loss', 'val_acc']

from matplotlib import pyplot as plt

plt.figure(1)

# summarize history for accuracy

plt.subplot(211)
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
# plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')

# summarize history for loss

plt.subplot(212)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
# plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

10.32. plot CNN layout

summaryWriter = tf.summary.FileWriter("model_name") summaryWriter.add_graph(sess.graph)

summaryWriter.add_summary(sess.run(summaryMeanTest0,feed_dict={testImagePH:testMean[0]}),i+1)

10.33. Optimizer

softmaxLoss = tf.losses.softmax_cross_entropy(onehot_labels=labelOnehot, logits=output) -> float or [batch]

  • labelOnehot - оригиналы
  • logits - то что вернула сеть
  • reduction: str = Reduction.SUM_BY_NONZERO_WEIGHTS - default
  • optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
    • cost - ?
  • sess.run(tf.global_variables_initializer())
  • sess.run([optim,loss],feed_dict=batch)

ways:

  • minimize()
    1. opt = GradientDescentOptimizer(learning_rate=0.1)
    2. opt_op = opt.minimize(cost, var_list=<list of variables>) - computing the gradients and applying them to the variables
    3. sess.run([opt_op,loss], feed_dict=batch) or opt_op.run()
  • compute_gradients() - process the gradients before applying them
    1. opt = GradientDescentOptimizer(learning_rate=0.1)
    2. grads_and_vars = opt.compute_gradients(loss, <list of variables>)
    3. capped_grads_and_vars = [(MyCapper(gv[0]), gv[1]) for gv in grads_and_vars]
    4. opt.apply_gradients(capped_grads_and_vars)
    5. sess.run([opt,loss], feed_dict=batch)
lrGP_PH, lrC_PH = tf.placeholder(tf.float32, shape=[]), tf.placeholder(tf.float32, shape=[])
optim = tf.train.AdamOptimizer(learning_rate=lrC_PH).minimize(loss, global_step=tf.train.get_global_step())


lrC = opt.lrC*opt.lrCdecay**(i//opt.lrCstep)
batch[lrC_PH] = lrC
sess.run

10.34. models - tensorflow_models as tfm

10.34.1. install

pip3 install tf-models-official==2.13

/usr/local/lib/python3.8/dist-packages

pip3 install tf-models-official==2.13 ; apt install -y emacs-nox

10.34.3. mnist

cd models/official/legacy/image_classification python mnist_main.py python mnist_main.py -ds parameter_server –data_dir /workspace/mnist

-ds,–distribution_strategy: The Distribution Strategy to use for training. Accepted values are 'off', 'one_device', 'mirrored', 'parameter_server', 'collective', case insensitive. 'off' means not to use Distribution Strategy; 'default' means to choose from `MirroredStrategy` or `OneDeviceStrategy` according to the number of GPUs. (default: 'mirrored')

-ng,–num_gpus: How many GPUs to use at each worker with the DistributionStrategies API. The default is 1. (default: '1') (an integer)

-te,–train_epochs: The number of epochs used to train. (default: '1') (an integer)

official.utils.flags._base: -bs,–batch_size: Batch size for training and evaluation. When using multiple gpus, this is the global batch size for all devices. For example, if the batch size is 32 and there are 4 GPUs, each GPU will get 8 examples on each step. (default: '1024') (an integer)

-te,–train_epochs: The number of epochs used to train. (default: '1') (an integer)

10.34.4. dummy dataset for MNIST

dummy_data = (
        tf.ones(shape=(10, 28, 28, 1), dtype=tf.int32),
        tf.range(10),
    )
    datasets = (
        tf.data.Dataset.from_tensor_slices(dummy_data),
        tf.data.Dataset.from_tensor_slices(dummy_data),
    )

10.34.5. Mobilenet example

# https://www.tensorflow.org/api_docs/python/tfm/vision/backbones/MobileNet
# https://stackoverflow.com/questions/63284471/tensorflow-use-model-inside-another-model-as-layer
import tensorflow as tf
import tensorflow_models as tfm
from tensorflow.keras import Input
from tensorflow.keras import Model


IS = 28
INPUT_SIZE = (IS, IS)
OUTPUT_SIZE = 2

input_specs = tf.keras.layers.InputSpec(shape=[None, IS, IS, 3])

sub_model = tfm.vision.backbones.MobileNet(
    input_specs=input_specs,
    filter_size_scale=0.65,
)

def model_test(input_shape, sub_model):
  inputs = Input(input_shape)
  intermedio = sub_model(inputs)
  iv = list(intermedio.values())
  f0 = tf.keras.layers.Flatten()(iv[0])
  f1 = tf.keras.layers.Flatten()(iv[1])
  f2 = tf.keras.layers.Flatten()(iv[2])
  dense_intr = tf.keras.layers.Concatenate()([f0, f1, f2])
  outputs = tf.keras.layers.Dense(OUTPUT_SIZE, activation=tf.keras.activations.softmax, name="d-out")(dense_intr)
  model = Model(inputs=inputs, outputs=outputs)
  return model

model = model_test((IS, IS, 3), sub_model)

model = model_test(INPUT_SIZE, sub_model)


# -- inference with dummy test
inputs = tf.keras.Input(shape=(IS, IS, 3), batch_size=1)
endpoints = model(inputs=inputs)
# -- compile
model.compile(loss="categorical_crossentropy", optimizer="adam")
# -- train

print(model.name)
print(endpoints)

10.34.6. RESNET example

# https://www.tensorflow.org/api_docs/python/tfm/vision/backbones/MobileNet
# https://stackoverflow.com/questions/63284471/tensorflow-use-model-inside-another-model-as-layer
import tensorflow as tf
import tensorflow_models as tfm
from tensorflow.keras import Input
from tensorflow.keras import Model
import os
import pandas as pd
import numpy as np
IS = 736
INPUT_SIZE = (IS, IS, 3)
OUTPUT_SIZE = None # lets get count of classes from Data
BATCH_SIZE = 5
DROUPOUT_RATE=0.2
# ---- Data ----
def get_paths(path="/landmark-retrieval-2020/train", max_count=-1):
    index = ["0","1","2","3","4","5","6","7","8","9","a","b","c","d","e","f"]
    paths = []
    for a in index:
        for b in index:
            for c in index:
                paths.extend([path+f"/{a}/{b}/{c}/" + x for x in os.listdir(path+f"/{a}/{b}/{c}")])
        if max_count > 0 and len(paths) > max_count:
            break
    return paths

paths = get_paths("/landmark-retrieval-2020/train", 150000)
df = pd.read_csv("/landmark-retrieval-2020/train.csv") # count 1580470 # id  landmark_id
mapping = {}
for path in paths:
    mapping[path.split('/')[-1].split('.')[0]] = path

df['path'] = df['id'].map(mapping) # add path column
df = df[~ df.path.isna()] # select records with "path" column
# - add probability for ...
alpha=0.6
counts_map = dict(df.groupby('landmark_id')['path'].agg(lambda x: len(x)))
df['counts'] = df['landmark_id'].map(counts_map)
df['prob'] = (  (1/df.counts**alpha) / (1/df.counts**alpha).max()).astype(np.float32) # ?
# select classes where we have enough examples
print("df[df.counts >70].shape", df[df.counts >70].shape) # >>> (4934, 5)
df = df[df.counts >70]
uniques = df['landmark_id'].unique() # unique classes
OUTPUT_SIZE = len(uniques)
df['label'] = df['landmark_id'].map(dict(zip(uniques, range(len(uniques))))) # scale landmark_id to 0-

image_paths, labels, probs = df.path.to_numpy(), df.label.to_numpy(), df.prob.to_numpy()


def split_data(images, labels, train_size=0.9, shuffle=True):
    # 1. Get the total size of the dataset
    size = len(images)
    # 2. Make an indices array and shuffle it, if required
    indices = np.arange(size)
    if shuffle:
        np.random.shuffle(indices)
    # 3. Get the size of training samples
    train_samples = int(size * train_size)
    # 4. Split data into training and validation sets
    x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
    x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]]
    return x_train, x_valid, y_train, y_valid

x_train, x_valid, y_train, y_valid = split_data(image_paths, labels)

# ----- Model ----
# -- sub_model - depend on model
input_specs = tf.keras.layers.InputSpec(shape=[None, IS, IS, 3])

sub_model = tfm.vision.backbones.resnet.ResNet(
    model_id = 50,
    input_specs = input_specs,
)

# -- Get outputs tensor of submodel
inputs = tf.keras.Input(shape=INPUT_SIZE, batch_size=1)
endpoints = sub_model(inputs=inputs)
print("endpoints", endpoints)
print()

# -- wrap sub_model in new Model to add input and output layers
def wrap_model(input_shape, sub_model):
    """ add inputs and outputs to model """
    inputs = Input(input_shape)
    intermedio = sub_model(inputs)
    # """Merge outputs - depende on model"""
    pooling = tf.keras.layers.GlobalAveragePooling2D(name='head/pooling')
    # dropout = tf.keras.layers.Dropout(DROUPOUT_RATE, name='head/dropout')
    # dense = tf.keras.layers.Dense(dense_units, name='head/dense')
    # x = intermedio
    # x = pooling(x)
    # x = dropout(x)
    # x = dense(x)
    iv = list(intermedio.values())
    f0 = tf.keras.layers.Flatten()(pooling(iv[0]))
    f1 = tf.keras.layers.Flatten()(pooling(iv[1]))
    f2 = tf.keras.layers.Flatten()(pooling(iv[2]))
    f3 = tf.keras.layers.Flatten()(pooling(iv[3]))
    x = tf.keras.layers.Concatenate()([f0, f1, f2, f3])
    # final layout:
    outputs = tf.keras.layers.Dense(OUTPUT_SIZE, activation=tf.keras.activations.softmax, name="d-out")(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

model = wrap_model(INPUT_SIZE, sub_model)
model.summary()
print("model.layers[0]._name", model.layers[0]._name)
model.layers[0]._name = "image"
# -- compile
model.compile(loss="categorical_crossentropy", optimizer="adam")


# ---- Dataset class ----
img_width = 736
img_height = 736

def encode_single_sample(img_path, label):
    # 1. Read image
    img = tf.io.read_file(img_path)
    # 2. Decode and convert to grayscale
    img = tf.io.decode_jpeg(img, channels=3)
    # 3. Convert to float32 in [0, 1] range
    img = tf.image.convert_image_dtype(img, tf.float32)
    # 4. Resize to the desired size
    img = tf.image.resize(img, [img_height, img_width])
    # 5. Transpose the image because we want the time
    # dimension to correspond to the width of the image.
    img = tf.transpose(img, perm=[1, 0, 2])
    # 7. Return a dict as our model is expecting two inputs
    # layer = tf.keras.layers.CategoryEncoding(num_tokens=OUTPUT_SIZE, output_mode="one_hot")
    label = tf.one_hot(label, OUTPUT_SIZE)
    return img, label


train_dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(str), y_train.astype(int))).skip(df.shape[0] - df.shape[0]//4)
train_dataset = train_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE)

train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(100)

validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid.astype(str), y_valid.astype(int))).skip(df.shape[0] - df.shape[0]//4)
validation_dataset = validation_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE)
validation_dataset = train_dataset.prefetch(100)

# ---- train ----
model.fit(train_dataset, epochs=1)
# -- checks the model's performance
print("evaluate")
model.evaluate(validation_dataset, verbose=2)
# -- inferece
print("inference", x_valid[0], y_valid[0])
im, l = encode_single_sample(x_valid[0], y_valid[0])
im = tf.expand_dims(im, axis=0)
print("im", im.shape)
predictions = model.predict(im, batch_size=1)
print(np.argmax(predictions))
print("label:", y_valid[0])

10.35. TensorFlow Serving

10.36. TODO TFX pipeline - MLOps

is a portable implementation of an ML workflow that can be run on various orchestrators, such as: Apache Airflow, Apache Beam, and Kubeflow Pipelines.

10.37. loss

  • loss = tf.losses.softmax_cross_entropy(onehot_labels=labelOnehot, logits=output, reduction=tf.losses.Reduction.MEAN)
  • lossm = tf.metrics.mean(loss)

10.39. custom metric

levels:

  • function -> values summarized and divided by count
  • class -> gives full control

10.39.1. function

total categorical accuracy

def total_categorical_accuracy(y_true, y_pred):
        # a = tf.cast(tf.math.equal(tf.argmax(y_true, axis=-1), tf.argmax(y_pred, axis=-1)), dtype=y_pred.dtype)
        a = keras.metrics.categorical_accuracy(y_true, y_pred)
        classes = tf.constant(a.shape[1], a.dtype)
        a2 = tf.reduce_sum(a, axis=-1)
        c = tf.cast(tf.math.equal(a2, classes), dtype=classes.dtype)
        return c
model.compile(loss=loss, optimizer=opt.optimizer, metrics=["categorical_accuracy",total_categorical_accuracy])

10.39.2. class

class ConfusionMatrixMetric(tf.keras.metrics.Metric):


    def update_state(self, y_true, y_pred,sample_weight=None):
        self.total_cm.assign_add(self.confusion_matrix(y_true,y_pred))
        return self.total_cm

    def result(self):
        return self.process_confusion_matrix()

    def confusion_matrix(self,y_true, y_pred):
        """
        Make a confusion matrix
        """
        y_pred=tf.argmax(y_pred,1)
        cm=tf.math.confusion_matrix(y_true,y_pred,dtype=tf.float32,num_classes=self.num_classes)
        return cm

    def process_confusion_matrix(self):
        "returns precision, recall and f1 along with overall accuracy"
        cm=self.total_cm
        diag_part=tf.linalg.diag_part(cm)
        precision=diag_part/(tf.reduce_sum(cm,0)+tf.constant(1e-15))
        recall=diag_part/(tf.reduce_sum(cm,1)+tf.constant(1e-15))
        f1=2*precision*recall/(precision+recall+tf.constant(1e-15))
        return precision,recall,f1

10.40. distributed training

10.40.1. API

  • tf.distribute.Strategy
  • high-level API Keras Model.fit
  • Custom training loop
  • Estimator API (Limited Support)

Notes:

  • Custom training loops: Eager mode is only recommended for debugging, in a graph recommended using tf.function (custom training loops)

10.40.2. terms

replica
copy of the model
Parameter servers
machines that hold a single copy of parameters/variables
Replica context
strategy.run function - when executing the computation function that is being replicated.
Cross-replica context
when you enter a strategy.scope
Update context
tf.distribute.StrategyExtended.update call
Reductions
method of aggregating multiple values into one value (sync training)
All-reduce
is an algorithm for performing a reduction on values from multiple devices and making the result available on all of those devices
Mirrored variables
variables that are created on multiple devices, where we keep the variables in sync by applying the same updates to every copy.
Distribute-aware layers
generally called in a replica context.

10.40.3. Synchronous vs asynchronous training

sync - via all-reduce

  • workers train over different slices of input data (Data parallelism)
  • aggregating gradients at each step
  • the updates from each replica are aggregated together before updating the model variables

async - via parameter server architecture

  • all workers are independently training over the input data and updating variables asynchronously
  • each replica updates the model variables independently

groups:

  • replicas partitioned into groups which are in sync within each group but async between groups.

10.40.4. strategies

MultiWorkerMirroredStrategy is very similar to MirroredStrategy. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs.

  1. MirroredStrategy

    tf.distribute.MirroredStrategy

    mirrors variables to multiple devices.

    Each variable in the model is mirrored across all the replicas. These variables are kept in sync with each other by applying identical updates.

    1. kubeflow ex MultiWorkerMirroredStrategy
      """An example of multi-worker training with Keras model using Strategy API."""
      
      from __future__ import absolute_import, division, print_function
      
      import argparse
      import json
      import os
      
      import tensorflow_datasets as tfds
      import tensorflow as tf
      from tensorflow.keras import layers, models
      
      
      def make_datasets_unbatched():
        BUFFER_SIZE = 10000
      
        # Scaling MNIST data from (0, 255] to (0., 1.]
        def scale(image, label):
          image = tf.cast(image, tf.float32)
          image /= 255
          return image, label
      
        datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True)
      
        return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
      
      
      def build_and_compile_cnn_model():
        model = models.Sequential()
        model.add(
            layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Flatten())
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(10, activation='softmax'))
      
        model.summary()
      
        model.compile(optimizer='adam',
                      loss='sparse_categorical_crossentropy',
                      metrics=['accuracy'])
      
        return model
      
      
      def decay(epoch):
        if epoch < 3: #pylint: disable=no-else-return
          return 1e-3
        if 3 <= epoch < 7:
          return 1e-4
        return 1e-5
      
      
      def main(args):
      
        # MultiWorkerMirroredStrategy creates copies of all variables in the model's
        # layers on each device across all workers
        # if your GPUs don't support NCCL, replace "communication" with another
        strategy = tf.distribute.MultiWorkerMirroredStrategy(
            communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.AUTO))
      
        BATCH_SIZE_PER_REPLICA = 64
        BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
      
        with strategy.scope():
          ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat()
          options = tf.data.Options()
          options.experimental_distribute.auto_shard_policy = \
              tf.data.experimental.AutoShardPolicy.DATA
          ds_train = ds_train.with_options(options)
          # Model building/compiling need to be within `strategy.scope()`.
          multi_worker_model = build_and_compile_cnn_model()
      
        # Define the checkpoint directory to store the checkpoints
        checkpoint_dir = args.checkpoint_dir
      
        # Name of the checkpoint files
        checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
      
        # Function for decaying the learning rate.
        # You can define any decay function you need.
        # Callback for printing the LR at the end of each epoch.
        class PrintLR(tf.keras.callbacks.Callback):
      
          def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use
            print('\nLearning rate for epoch {} is {}'.format(
              epoch + 1, multi_worker_model.optimizer.lr.numpy()))
      
        callbacks = [
            tf.keras.callbacks.TensorBoard(log_dir='./logs'),
            tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                               save_weights_only=True),
            tf.keras.callbacks.LearningRateScheduler(decay),
            PrintLR()
        ]
      
        # Keras' `model.fit()` trains the model with specified number of epochs and
        # number of steps per epoch. Note that the numbers here are for demonstration
        # purposes only and may not sufficiently produce a model with good quality.
        multi_worker_model.fit(ds_train,
                               epochs=10,
                               steps_per_epoch=70,
                               callbacks=callbacks)
      
        # Saving a model
        # Let `is_chief` be a utility function that inspects the cluster spec and
        # current task type and returns True if the worker is the chief and False
        # otherwise.
        def is_chief():
          return TASK_INDEX == 0
      
        if is_chief():
          model_path = args.saved_model_dir
      
        else:
          # Save to a path that is unique across workers.
          model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX)
      
        multi_worker_model.save(model_path)
      
      
      if __name__ == '__main__':
        os.environ['NCCL_DEBUG'] = 'INFO'
      
        tfds.disable_progress_bar()
      
        # to decide if a worker is chief, get TASK_INDEX in Cluster info
        tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
        TASK_INDEX = tf_config['task']['index']
      
        parser = argparse.ArgumentParser()
        parser.add_argument('--saved_model_dir',
                            type=str,
                            required=True,
                            help='Tensorflow export directory.')
      
        parser.add_argument('--checkpoint_dir',
                            type=str,
                            required=True,
                            help='Tensorflow checkpoint directory.')
      
        parsed_args = parser.parse_args()
        main(parsed_args)
      
      
  2. CentralStorageStrategy (experimental)

    tf.distribute.experimental.CentralStorageStrategy

    puts all variables on a single device on the same machine (and does sync training).

  3. ParameterServerStrategy (experimental)

    creates variables on the parameter servers.

    api

    • Model.fit
    • custom training loop
      • tf.distribute.experimental.ParameterServerStrategy (tensorflow 1.0)
      • tf.distribute.ParameterServerStrategy

    notes:

    • data-parallel method
    • All replicas that want to operate on a variable retrieve parameters/variables from Par server at the beginning of a step and send an update to be applied at the end of the step. These can in principle support either sync or async training, but right now we only have support for async training with parameter servers.
    • workers and parameter servers
    • Variables are created on parameter servers and they are read and updated by workers in each step
    • workers read and update these variables independently without synchronizing with each other (asynchronous training)
    • 'cluster' with several 'jobs', and each of the jobs may have one or more 'tasks'

    recommended to have:

    • One coordinator job (has the job name or task type: chief) - creates resources, dispatches training tasks, writes checkpoints, and deals with task failures.
      • know the addresses and ports of all other TensorFlow servers, except the evaluator.
    • Multiple worker jobs (job name or task type: worker)
      • need to know which port they need to listen to.
      • all workers should have the same number of GPUs available.
      • each worker receives the same dataset, except when it is shuffled differently
    • Multiple parameter server jobs (job name or task type: ps) - tf.distribute.Server
      • need to know which port they need to listen to.
    • evaluator (optional) -

    worker and ps

    • run tf.distribute.Server instances that listen for requests from the chief.
    • dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline.
    • apply the transformation inside the dataset_fn via tf.data.Dataset.map

    datasets allowed to use:

    • tf.data.Dataset
    • tf.distribute.DistributedDataset
    • tf.keras.utils.experimental.DatasetCreator - the code in dataset_fn will be invoked on the input device, which is usually the CPU, on each of the worker machines.

    repeat and steps_per_epoch

    • Dataset.repeat — which repeats a dataset indefinitely when called without an argument—and specify the steps_per_epoch argument in the Model.fit call.

    Note from TF (Model.fit):

    • When using a `tf.keras.utils.experimental.DatasetCreator`, `steps_per_epoch`, `validation_steps`, `steps`, or `pss_evaluation_shards` argument must be provided in `Model.fit`, `Model.evaluate`, or `Model.predict`
      • validation_steps - for validation data
      • pss_evaluation_shards - The number of shards should be at least the number of workers for good performance.
    1. tf.data.experimental.AutoShardPolicy
      • OFF: No sharding will be performed.
      • AUTO: Attempts FILE-based sharding, falling back to DATA-based sharding.
      • FILE: Shards by input files (i.e. each worker will get a set of files to process). When this option is selected, make sure that there is at least as many files as workers. If there are fewer input files than workers, a runtime error will be raised.
      • DATA: Shards by elements produced by the dataset. Each worker will process the whole dataset and discard the portion that is not for itself. Note that for this mode to correctly partitions the dataset elements, the dataset needs to produce elements in a deterministic order.
      • HINT: Looks for the presence of shard(SHARD_HINT, …) which is treated as a placeholder to replace with shard(num_workers, worker_index).

      usage:

      • options = tf.data.Options()
      • options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
      • train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
      • train_dataset = train_dataset.with_options(options)

      AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"

    2. Evaluation

      For users using Model.fit, Model.evaluate uses inline (distributed) evaluation under the hood.

      • inline evaluation
      • sidecar evaluation
    3. algorithm

      explanation 2014 https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-li_mu.pdf

      useful to compare "parameter server" to more general-purpose distributed systems:

      • which mandate synchronous, iterative communication - iterative MapReduce framework
      • Distributed GraphLab - asycnronously schedules communication using a graph abstraction.

      core goal of parameter server:

      • preserving state between iterations

      Мы, как и прежде, создаём копии модели на всех воркерах. 8

      • парализм данных
    4. model and fit

      https://www.tensorflow.org/api_docs/python/tf/keras/Model

      • steps_per_epoch - Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch
    5. dataset

      batches that straddle epoch boundaries - пакетов, которые пересекают границы эпох

      • repeat with no argument - infinity
      • repeat + batch = batches that straddle epoch boundaries
      • batch + repeat = clear epoch separation
      • shuffle + repeat = show every element of one epoch before moving to the next
      • repeat + shuffle = mixes the epoch boundaries together
    6. usage
      # ---- who do what
      cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
      if cluster_resolver.task_type in ("worker", "ps"):
          # Start a TensorFlow server and wait.
          # Set the environment variable to allow reporting worker and ps failure to the
          # coordinator. This is a workaround and won't be necessary in the future.
          os.environ["GRPC_FAIL_FAST"] = "use_caller"
      
          server = tf.distribute.Server(
              cluster_resolver.cluster_spec(),
              job_name=cluster_resolver.task_type,
              task_index=cluster_resolver.task_id,
              protocol=cluster_resolver.rpc_layer or "grpc",
              start=True)
          server.join()
      elif cluster_resolver.task_type == "evaluator":   # Run sidecar evaluation
          pass # note used
      else:  # Run the coordinator.
      
      
      
      
      # ---- ParameterServerStrategy object. will use all the available GPUs on each worker
      NUM_PS=1
      variable_partitioner = (
          tf.distribute.experimental.partitioners.MinSizePartitioner(
              min_shard_bytes=(256 << 10),
              max_shards=NUM_PS))
      
      strategy = tf.distribute.ParameterServerStrategy(
          cluster_resolver,
          variable_partitioner=variable_partitioner)
      
      # -- trivial model
      with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline.
        model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
        model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)
      
      
      
    7. usage working parameter server strategy for TF 2.0
      import tensorflow as tf
      import os
      # ---- who do what
      cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
      
      # -- set GPU for worker
      def set_gpu():
          gpus = tf.config.list_physical_devices('GPU')
          if gpus:
              # Restrict TensorFlow to only use the first GPU
              try:
                  tf.config.set_visible_devices(gpus[0], 'GPU')
                  logical_gpus = tf.config.list_logical_devices('GPU')
                  print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
              except RuntimeError as e:
                  # Visible devices must be set before GPUs have been initialized
                  print(e)
      
      if cluster_resolver.task_type in ("worker"):
          set_gpu()
      
      # -- wait for task for worker and ps
      if cluster_resolver.task_type in ("worker", "ps"):
          # Start a TensorFlow server and wait.
          # Set the environment variable to allow reporting worker and ps failure to the
          # coordinator. This is a workaround and won't be necessary in the future.
          os.environ["GRPC_FAIL_FAST"] = "use_caller"
      
          server = tf.distribute.Server(
              cluster_resolver.cluster_spec(),
              job_name=cluster_resolver.task_type,
              task_index=cluster_resolver.task_id,
              protocol=cluster_resolver.rpc_layer or "grpc",
              start=True)
          print("cluster_resolver.task_type", cluster_resolver.task_type)
          print("cluster_resolver.task_id", cluster_resolver.task_id)
          print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc")
          server.join()
      elif cluster_resolver.task_type == "evaluator":   # Run sidecar evaluation
          pass # note used
      else:  # Run the coordinator.
          # ---- ParameterServerStrategy object. will use all the available GPUs on each worker
          NUM_PS=1
          variable_partitioner = (
              tf.distribute.experimental.partitioners.MinSizePartitioner(
                  min_shard_bytes=(256 << 10),
                  max_shards=NUM_PS))
      
          strategy = tf.distribute.ParameterServerStrategy(
              cluster_resolver,
              variable_partitioner=variable_partitioner)
      
      
          # -- data
          mnist = tf.keras.datasets.mnist
          (x_train, y_train), (x_test, y_test) = mnist.load_data()
          x_train, x_test = x_train / 255.0, x_test / 255.0
      
          # -- trivial model
          with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline.
              # -- Dataset TF class
              batch_size=16
              train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
              train_dataset = train_dataset.shuffle(60000).repeat().batch(batch_size)
              validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
              validation_dataset = validation_dataset.shuffle(60000).batch(batch_size)
              # -- model
              model = tf.keras.models.Sequential([
                  tf.keras.layers.Flatten(input_shape=(28, 28)),
                  tf.keras.layers.Dense(128, activation='relu'),
                  tf.keras.layers.Dropout(0.2),
                  tf.keras.layers.Dense(10)
              ])
      
              loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
              model.compile(optimizer='adam',
                            loss=loss_fn,
                            metrics=['accuracy'],
                            pss_evaluation_shards='auto')
      
              # -- train
              model.fit(train_dataset, epochs=5, steps_per_epoch=300)
              # -- save
              model.save('aa.keras', overwrite=True, save_format="tf")  # The file needs to end with the .keras extension
          model = tf.keras.models.load_model('aa.keras')
          # -- checks the model's performance
          model.evaluate(validation_dataset, verbose=2)
          # -- inferece
          predictions = model(x_train[:1]).numpy()
          import numpy as np
          print(np.argmax(predictions))
          print(y_train[:1])
      
    8. usage working parameter server strategy for TF 2.0 v2
      
      
    9. usage3 dataset creator (comment several prams)
      import tensorflow as tf
      import os
      # ---- who do what
      cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
      
      # -- set GPU for worker
      def set_gpu():
          gpus = tf.config.list_physical_devices('GPU')
          if gpus:
              # Restrict TensorFlow to only use the first GPU
              try:
                  tf.config.set_visible_devices(gpus[0], 'GPU')
                  logical_gpus = tf.config.list_logical_devices('GPU')
                  print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
              except RuntimeError as e:
                  # Visible devices must be set before GPUs have been initialized
                  print(e)
      
      if cluster_resolver.task_type in ("worker"):
          set_gpu()
      
      # -- wait for task for worker and ps
      if cluster_resolver.task_type in ("worker", "ps"):
          # Start a TensorFlow server and wait.
          # Set the environment variable to allow reporting worker and ps failure to the
          # coordinator. This is a workaround and won't be necessary in the future.
          os.environ["GRPC_FAIL_FAST"] = "use_caller"
      
          server = tf.distribute.Server(
              cluster_resolver.cluster_spec(),
              job_name=cluster_resolver.task_type,
              task_index=cluster_resolver.task_id,
              protocol=cluster_resolver.rpc_layer or "grpc",
              start=True)
          print("cluster_resolver.task_type", cluster_resolver.task_type)
          print("cluster_resolver.task_id", cluster_resolver.task_id)
          print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc")
          server.join()
      elif cluster_resolver.task_type == "evaluator":   # Run sidecar evaluation
          pass # note used
      else:  # Run the coordinator.
          # def dataset_fn(input_context):
          #     dataset = dataset.map(preprocessing_layer)
          #     return dataset
      
          # dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
      
          # ---- ParameterServerStrategy object. will use all the available GPUs on each worker
          NUM_PS=1
          variable_partitioner = (
              tf.distribute.experimental.partitioners.MinSizePartitioner(
                  min_shard_bytes=(256 << 10),
                  max_shards=NUM_PS))
      
          strategy = tf.distribute.ParameterServerStrategy(
              cluster_resolver,
              variable_partitioner=variable_partitioner)
      
      
          # -- data
          mnist = tf.keras.datasets.mnist
          (x_train, y_train), (x_test, y_test) = mnist.load_data()
          x_train, x_test = x_train / 255.0, x_test / 255.0
      
          # -- trivial model
          with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline.
              # -- Dataset TF class
              train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
              validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
              # -- model
              model = tf.keras.models.Sequential([
                  tf.keras.layers.Flatten(input_shape=(28, 28)),
                  tf.keras.layers.Dense(128, activation='relu'),
                  tf.keras.layers.Dropout(0.2),
                  tf.keras.layers.Dense(10)
              ])
      
              loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
      
              # steps_per_execution=10,
              ,
                            pss_evaluation_shards='auto'
              model.compile(optimizer='adam',
                            loss=loss_fn,
                            metrics=['accuracy'])
      
              # -- train
      
              model.fit(x_train, y_train, epochs=5, steps_per_epoch=3)
              model.fit(train_dataset, epochs=5, steps_per_epoch=3000)
              # -- checks the model's performance
              model.evaluate(validation_dataset, verbose=2)
              # # -- inferece
              # predictions = model(x_train[:1]).numpy()
              # import numpy as np
              # print(np.argmax(predictions))
              # print(y_train[:1])
      
    10. mnist last version
      # Disable all GPUs. This prevents errors caused by all workers trying to use the same GPU. In a real-world application, each worker would be on a different machine.
      # import os
      # os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
      
      import tensorflow as tf
      import os
      import logging
      import multiprocessing
      
      tf.get_logger().setLevel(logging.DEBUG)
      
      # ---- who do what
      cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
      
      # -- set GPU for worker
      def set_gpu():
          gpus = tf.config.list_physical_devices('GPU')
          if gpus:
              # Restrict TensorFlow to only use the first GPU
              try:
                  for device in gpus:
                      tf.config.experimental.set_memory_growth(device, True)
                  # tf.config.set_logical_device_configuration(
                  #         gpus[0],
                  #         [tf.config.LogicalDeviceConfiguration(memory_limit=3024)])
                  gpu_devices = tf.config.experimental.list_physical_devices('GPU')
                  tf.config.set_visible_devices(gpus[0], 'GPU')
                  logical_gpus = tf.config.list_logical_devices('GPU')
                  print()
                  print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
                  print()
                  cpu_ph = tf.config.list_physical_devices('CPU')
                  cpu_lg = tf.config.list_logical_devices('CPU')
                  print(len(cpu_ph), "Physical CPUs,", len(cpu_lg), "Logical CPU")
      
              except RuntimeError as e:
                  # Visible devices must be set before GPUs have been initialized
                  print(e)
      
      # if cluster_resolver.task_type in ("worker", "ps"):
      set_gpu() # for all
      
      # -- wait for task for worker and ps
      if cluster_resolver.task_type in ("worker", "ps"):
          # Start a TensorFlow server and wait.
          # Set the environment variable to allow reporting worker and ps failure to the
          # coordinator. This is a workaround and won't be necessary in the future.
          os.environ["GRPC_FAIL_FAST"] = "use_caller"
      
          # # Workers need some inter_ops threads to work properly.
          worker_config = tf.compat.v1.ConfigProto(device_count={'GPU': 1, 'CPU':1})
          if cluster_resolver.task_type in ("worker"):
              NUM_WORKERS=len(cluster_resolver.cluster_spec().job_tasks('worker'))
              if multiprocessing.cpu_count() < NUM_WORKERS + 1:
                  worker_config.inter_op_parallelism_threads = NUM_WORKERS + 1
      
          server = tf.distribute.Server(
              cluster_resolver.cluster_spec(),
              job_name=cluster_resolver.task_type,
              task_index=cluster_resolver.task_id,
              config=worker_config,
              protocol=cluster_resolver.rpc_layer or "grpc",
              start=True)
          print("cluster_resolver.task_type", cluster_resolver.task_type)
          print("cluster_resolver.task_id", cluster_resolver.task_id)
          print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc")
          print("server.default_session_config", server.server_def.default_session_config)
          print()
          server.join()
      elif cluster_resolver.task_type == "evaluator":   # Run sidecar evaluation
          pass # note used
      else:  # Run the coordinator.
          # ---- ParameterServerStrategy object. will use all the available GPUs on each worker
          NUM_PS=len(cluster_resolver.cluster_spec().job_tasks('ps'))
          variable_partitioner = (
              tf.distribute.experimental.partitioners.MinSizePartitioner(
                  min_shard_bytes=(256 << 10),
                  max_shards=NUM_PS))
      
          strategy = tf.distribute.ParameterServerStrategy(
              cluster_resolver,
              variable_partitioner=variable_partitioner)
      
      
          # -- data
          mnist = tf.keras.datasets.mnist
          (x_train, y_train), (x_test, y_test) = mnist.load_data()
          x_train, x_test = x_train / 255.0, x_test / 255.0
      
          # -- trivial model
          with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline.
              # with tf.device('/device:GPU:0'):
              batch_size=32
      
              # -- Dataset TF class
              train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
      
              # suppress warning at worker, maybe fix error.
              options = tf.data.Options()
              options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
      
              train_dataset = train_dataset.with_options(options)
              train_dataset = train_dataset.shuffle(600).repeat().batch(batch_size).prefetch(300)
              train_dataset = strategy.experimental_distribute_dataset(train_dataset)
              validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
              validation_dataset = validation_dataset.shuffle(600).batch(batch_size)
              # -- model
              model = tf.keras.models.Sequential([
                  tf.keras.layers.Flatten(input_shape=(28, 28)),
                  tf.keras.layers.Dense(400, activation='relu'),
                  # tf.keras.layers.Dense(3420, activation='relu'),
                  # tf.keras.layers.Dense(3420, activation='relu'),
                  tf.keras.layers.Dropout(0.2),
                  tf.keras.layers.Dense(10)
              ])
      
              loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
              model.compile(optimizer='adam',
                            loss=loss_fn,
                            metrics=['accuracy'],
                            # not required: pss_evaluation_shards='auto'
                            )
              # print model
              model.summary()
      
              # -- train
              model.fit(train_dataset, epochs=5, steps_per_epoch=300)
              # -- save
              model.save('aa.keras', overwrite=True, save_format="tf")  # The file needs to end with the .keras extension
          model = tf.keras.models.load_model('aa.keras')
          # -- checks the model's performance
          model.evaluate(validation_dataset, verbose=2)
          # -- inferece
          predictions = model(x_train[:1]).numpy()
          import numpy as np
          print(np.argmax(predictions))
          print(y_train[:1])
      
      
    11. resnet
      pip3 install tf-models-official==2.13 ; apt install -y emacs-nox
      
      # Disable all GPUs. This prevents errors caused by all workers trying to use the same GPU. In a real-world application, each worker would be on a different machine.
      # import os
      # os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
      
      import tensorflow as tf
      import os
      import logging
      import multiprocessing
      
      tf.get_logger().setLevel(logging.DEBUG)
      
      # ---- who do what
      cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
      
      # -- set GPU for worker
      def set_gpu():
          gpus = tf.config.list_physical_devices('GPU')
          if gpus:
              # Restrict TensorFlow to only use the first GPU
              try:
                  for device in gpus:
                      tf.config.experimental.set_memory_growth(device, True)
                  # tf.config.set_logical_device_configuration(
                  #         gpus[0],
                  #         [tf.config.LogicalDeviceConfiguration(memory_limit=3024)])
                  gpu_devices = tf.config.experimental.list_physical_devices('GPU')
                  tf.config.set_visible_devices(gpus[0], 'GPU')
                  logical_gpus = tf.config.list_logical_devices('GPU')
                  print()
                  print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
                  print()
                  cpu_ph = tf.config.list_physical_devices('CPU')
                  cpu_lg = tf.config.list_logical_devices('CPU')
                  print(len(cpu_ph), "Physical CPUs,", len(cpu_lg), "Logical CPU")
      
              except RuntimeError as e:
                  # Visible devices must be set before GPUs have been initialized
                  print(e)
      
      # if cluster_resolver.task_type in ("worker", "ps"):
      set_gpu() # for all
      
      # -- wait for task for worker and ps
      if cluster_resolver.task_type in ("worker", "ps"):
          # Start a TensorFlow server and wait.
          # Set the environment variable to allow reporting worker and ps failure to the
          # coordinator. This is a workaround and won't be necessary in the future.
          os.environ["GRPC_FAIL_FAST"] = "use_caller"
      
          # # Workers need some inter_ops threads to work properly.
          worker_config = tf.compat.v1.ConfigProto(device_count={'GPU': 1, 'CPU':1})
          if cluster_resolver.task_type in ("worker"):
              NUM_WORKERS=len(cluster_resolver.cluster_spec().job_tasks('worker'))
              if multiprocessing.cpu_count() < NUM_WORKERS + 1:
                  worker_config.inter_op_parallelism_threads = NUM_WORKERS + 1
      
          server = tf.distribute.Server(
              cluster_resolver.cluster_spec(),
              job_name=cluster_resolver.task_type,
              task_index=cluster_resolver.task_id,
              config=worker_config,
              protocol=cluster_resolver.rpc_layer or "grpc",
              start=True)
          print("cluster_resolver.task_type", cluster_resolver.task_type)
          print("cluster_resolver.task_id", cluster_resolver.task_id)
          print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc")
          print("server.default_session_config", server.server_def.default_session_config)
          print()
          server.join()
      elif cluster_resolver.task_type == "evaluator":   # Run sidecar evaluation
          pass # note used
      else:  # Run the coordinator.
          # ---- ParameterServerStrategy object. will use all the available GPUs on each worker
          NUM_PS=len(cluster_resolver.cluster_spec().job_tasks('ps'))
          variable_partitioner = (
              tf.distribute.experimental.partitioners.MinSizePartitioner(
                  min_shard_bytes=(256 << 10),
                  max_shards=NUM_PS))
      
          strategy = tf.distribute.ParameterServerStrategy(
              cluster_resolver,
              variable_partitioner=variable_partitioner)
      
          # ---------------------------------------------------------------------------------------------------
          # ----------------------- Model, Dataset, Training --------------------------------------------------
          with strategy.scope()
              from importlib import reload
              reload("./resnet-model-and-data.py")
      
          # ------------ Part require modification for ParameterServer strategy
          train_dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(str), y_train.astype(int))).skip(df.shape[0] - df.shape[0]//4)
          train_dataset = train_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE)
      
          train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(100)
      
          validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid.astype(str), y_valid.astype(int))).skip(df.shape[0] - df.shape[0]//4)
          validation_dataset = validation_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE)
          validation_dataset = train_dataset.prefetch(100)
      
          # ---- train ----
          model.fit(train_dataset, epochs=1)
      # -- checks the model's performance
      print("evaluate")
      model.evaluate(validation_dataset, verbose=2)
      # -- inferece
      print("inference", x_valid[0], y_valid[0])
      im, l = encode_single_sample(x_valid[0], y_valid[0])
      im = tf.expand_dims(im, axis=0)
      print("im", im.shape)
      predictions = model.predict(im, batch_size=1)
      print(np.argmax(predictions))
      print("label:", y_valid[0])
      
      
    12. Variable sharding

      for very large embeddings that may not fit in a single machine's memory

    13. TF_CONFIG

      'TF_CONFIG' environment variable if you use TFConfigClusterResolver.

    14. logging steps
      train_step = model.train_step
      def my_train_step(data):
          tf.print("step:", model._train_counter)
          return train_step(data)
      model.train_step = my_train_step
      
    15. troubleshooting

      auto_shard.cc: AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"

      Attempting to perform BLAS operation using StreamExecutor without BLAS support" error occurs

      • tf.config.set_logical_device_configuration(gpus[0],[tf.config.LogicalDeviceConfiguration(memory_limit=1024)])

      NOT_FOUND: TensorFlow device GPU:1 was not registered - several times after start

      • all pods should have equal amount of GPU:
        • in YAML: resources: limits: nvidia.com/gpu: 1
        • tf.compat.v1.ConfigProto(device_count={'GPU': 1, 'CPU':1}) - for all pods

      SessionOptions: device_count{key: "CPU", value:1,}, device_count{key: "GPU", value:0,}

      • enable GPU at chief and PS

      Successful NUMA node read from SysFS had negative value (-1)

    16. links

10.40.5. TF_CONFIG

'TF_CONFIG' environment variable is a JSON string

  • what tasks constitute a cluster
  • their addresses
  • each task's role in the cluster

10.40.6. data sharding

tf.data.experimental.AutoShardPolicy

  • AUTO or FILE - tf.data.Dataset that reads from files.

Note: tf.data.experimental.AutoShardPolicy.FILE - the actual per-step batch size may be smaller than the one you defined for the global batch size - when the remaining elements in the file are less than the global batch size

10.40.8. monitor

  1. chargpt
    1. TensorFlow Extended (TFX): TFX provides a comprehensive end-to-end pipeline for building,

    training, and deploying machine learning models. It includes components for monitoring the model training process and tracking model metrics during training.

    1. TensorBoard: TensorBoard is a web-based tool provided by TensorFlow that allows you to

    visualize and monitor various aspects of your model training, such as loss, accuracy, and computational graphs. It can be integrated with Kubernetes to monitor the training process running on the cluster.

    1. Kubernetes Dashboard: The Kubernetes dashboard is a web-based user interface that provides a

    visual representation of the cluster, including information about deployments, pods, jobs, and other resources. It can be used to monitor the status and progress of the neural network training on the Kubernetes cluster.

    1. Prometheus and Grafana: Prometheus is a popular open-source monitoring and alerting platform

    that can be used to collect and store metrics from your TensorFlow cluster. Grafana is a visualization and analytics tool that can be integrated with Prometheus to create customizable dashboards for monitoring and analyzing training metrics.

    1. KubeFlow: KubeFlow is an open-source project that provides a platform for end-to-end machine

    learning workflows on Kubernetes. It includes components for model training, hyperparameter tuning, model packaging, and serving. KubeFlow also provides monitoring capabilities to track the progress of your model training and performance metrics.

  2. TODO tensorboard

10.41. toy model MNIST

#+NAME https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras

import tensorflow as tf
mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
# -- dataset
batch_size=16
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(60000).repeat().batch(batch_size)
validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
# -- model
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam',
              loss=loss_fn,
              metrics=['accuracy'])

# -- train
# model.fit(x_train, y_train, epochs=5)
model.fit(train_dataset, epochs=5, steps_per_epoch=200)
# -- checks the model's performance
model.evaluate(x_test,  y_test, verbose=2)
# -- inferece
predictions = model(x_train[:1]).numpy()
import numpy as np
print(np.argmax(predictions))
print(y_train[:1])

10.42. logging

https://stackoverflow.com/questions/40559667/how-to-redirect-tensorflow-logging-to-a-file

tf.keras.utils.enable_interactive_logging() When interactive logging is enabled, Keras displays logs via stdout. This provides the best experience when using Keras in an interactive environment such as a shell or a notebook.

tensor:

  • tf.debugging
  • tf.print

log:

  • tf.get_logger() return logging.getLogger('tensorflow')

10.42.2. pipe

script allow get full output

script -c 'python -i <<< "print \"test\""'

freezing at tree: disable buffering:

  • sed -u
  • grep –line-buffered
  • perl -ne 'use IO::Handle ; printf "%s %s", scalar time(), $_ ; STDOUT->autoflush(1)'

10.42.3. logging

import logging

log = logging.getLogger('tensorflow') log.setLevel(logging.DEBUG)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

fh = logging.FileHandler('tensorflow.log') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) log.addHandler(fh)

10.43. callbacks for model.fit

10.44. USE CASES

10.44.1. TF 2.0 convert mode h5 to weight and arch

from tensorflow import keras
from tensorflow.keras.models import Model
import os
# use CPU
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
# parent_path = os.path.join(os.getcwd(), os.pardir)
model_path = '/mnt/hit4/hit4user/PycharmProjects/cnn/text_or_not/saved_models/cnn_trained_model2020-09-10 09:26:34.553480.h5'
print(model_path)

model: Model = keras.models.load_model(model_path)
import time

name = 'cnn_trained_model2020-09-10 09:26:34.553480'
os.mkdir(name)


with open("./"+name+"/model_to_json.json", "w") as json_file:
    json_file.write(model.to_json(indent=4))

model.save_weights('./'+name+'/')
print("ok")
time.sleep(1)

10.44.2. imbalanced dataset

strategy:

  • oversample min to half of max
  • apply class_weight
  1. class_weight

    for binary:

    weight_for_0 = (1 / neg) * (total / 2.0)
    weight_for_1 = (1 / pos) * (total / 2.0)
    class_weight = {0: weight_for_0, 1: weight_for_1}
    

    for n-classes:

    n_samples / (n_classes * np.bincount(y))
    
    • n_samples is the total number of instances
    • n_classes is the number of classes
    • np.bincount(y) is an array of the number of instances in each class

    apply weights:

    
    
    n_classes = sorted(set(y))
    n_samples = len(xy)
    n_samples / (n_classes * np.bincount(y))
    model.fit(class_weight=class_weight)
    
    y = [0]*5 + [1]*2 + [2]*5
    y = np.array(y)
    x = np.array(list(range(len(y))))
    xy= np.vstack([x,y]).transpose()
    # print(xy)
    classes = sorted(set(y))
    n_classes = len(classes)
    n_samples = len(xy)
    print(n_samples)
    print(n_classes)
    print(np.bincount(y))
    import numpy as np
    y = np.array(y)
    weights = n_samples / (n_classes * np.bincount(y))
    class_weight = {c:w for c,w in zip(classes, weights)}
    print(class_weight)
    
    12
    3
    [5 2 5]
    {0: 0.8, 1: 2.0, 2: 0.8}
    
    import numpy as np
    y = [0]*100 + [1]*10 + [2]*300
    u = sorted(set(y))
    n_classes=3
    n_samples=len(y)
    w = n_samples / (n_classes * np.bincount(y))
    class_weight = {x:y for x, y in zip(u, w)}
    print(np.bincount(y), "- np.bincount(y) first sort ASC")
    print("unique", u)
    print(class_weight)
    
    [100  10 300] - np.bincount(y) first sort ASC
    unique [0, 1, 2]
    {0: 1.3666666666666667, 1: 13.666666666666666, 2: 0.45555555555555555}
    
  2. numpy choose, oversampling
    1. 1
      import numpy as np
      y = [0]*100 + [1]*10 + [2]*300
      u = sorted(set(y))
      print(np.bincount(y), "- np.bincount(y) first sort ASC")
      # -- oversampling
      distrib = np.bincount(y)
      prob = 1/distrib[y].astype(float)
      prob /= prob.sum()
      
      print("distrib =", distrib, distrib[y])
      print("a =", np.arange(len(y)))
      print("count after(size) =", np.count_nonzero(distrib)*distrib.max())
      print("prob =", prob)
      sel = np.random.choice(np.arange(len(y)), size=np.count_nonzero(distrib)*distrib.max(), p=prob).astype(int)
      y = np.array(y)
      print(y[np.random.choice(np.arange(len(y)), size=np.count_nonzero(distrib)*distrib.max(), p=prob)])
      print(np.bincount(y[sel]))
      
      [100  10 300] - np.bincount(y) first sort ASC
      distrib = [100  10 300] [100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100
       100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100
       100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100
       100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100
       100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100
       100 100 100 100 100 100 100 100 100 100  10  10  10  10  10  10  10  10
        10  10 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300
       300 300 300 300 300 300 300 300 300 300 300 300 300 300]
      a = [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
        18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
        36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
        54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
        72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
        90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
       108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
       126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
       144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
       162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
       180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
       198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
       216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
       234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
       252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
       270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
       288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
       306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
       324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
       342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
       360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
       378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
       396 397 398 399 400 401 402 403 404 405 406 407 408 409]
      count after(size) = 900
      prob = [0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333
       0.00333333 0.00333333 0.00333333 0.00333333 0.03333333 0.03333333
       0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333
       0.03333333 0.03333333 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111
       0.00111111 0.00111111]
      [2 0 0 2 1 1 2 1 2 0 0 2 1 0 0 0 2 0 1 1 1 2 0 2 1 1 2 2 0 1 0 2 0 1 0 2 0
       2 1 1 0 1 1 1 1 0 2 2 2 1 0 2 0 1 0 0 1 2 1 0 2 1 2 1 0 0 1 1 2 1 2 2 1 2
       2 0 2 1 0 1 1 1 0 0 0 1 2 1 2 1 0 2 2 1 1 2 1 2 1 1 0 1 0 2 2 1 2 2 2 1 2
       0 0 0 0 2 1 2 1 0 0 1 2 2 2 1 2 1 0 0 1 2 1 2 0 0 0 0 2 0 2 1 2 2 2 2 0 2
       2 1 0 0 0 0 2 1 1 2 1 1 0 2 2 2 0 2 2 1 2 2 2 2 2 2 1 0 2 2 0 2 0 1 1 2 2
       2 1 2 1 2 0 1 1 1 1 1 2 0 0 0 1 2 2 2 1 2 2 1 2 1 2 1 2 0 0 0 2 1 2 1 1 2
       0 1 2 2 0 2 1 1 0 2 0 2 0 1 0 0 2 0 2 0 1 0 2 0 1 2 2 0 0 0 1 0 0 1 0 0 0
       2 1 0 2 0 1 2 0 0 1 0 1 1 0 1 2 2 1 0 0 1 0 2 2 2 0 0 2 2 1 2 0 2 1 0 2 0
       2 0 1 0 1 1 0 2 0 1 1 1 0 0 0 0 1 0 1 0 1 2 0 0 0 0 2 0 0 0 2 0 1 2 0 2 1
       1 1 0 1 0 2 1 0 2 1 0 2 1 2 2 0 2 0 1 2 0 1 1 1 2 2 0 0 2 0 1 1 0 2 1 2 1
       0 0 2 1 0 2 0 2 0 2 2 0 0 1 2 0 2 0 1 1 1 0 2 1 2 2 1 0 1 2 0 2 2 1 2 1 2
       0 1 1 2 2 2 1 1 1 1 2 1 0 0 1 1 1 2 2 1 0 0 0 2 1 1 2 0 1 2 1 0 0 2 1 0 2
       2 1 2 0 1 1 0 1 1 1 0 1 2 2 2 2 1 1 2 0 1 1 1 1 0 2 2 0 2 0 2 1 2 1 2 2 0
       1 2 1 0 0 0 0 1 1 1 0 2 0 2 0 2 1 1 2 2 1 1 2 2 1 2 2 1 1 0 2 1 2 0 1 0 1
       0 2 0 2 1 2 2 0 1 2 1 1 1 1 2 0 0 0 2 1 2 2 0 1 2 1 2 0 0 0 2 1 2 0 1 0 0
       0 1 0 2 1 0 0 2 1 1 1 1 0 1 2 2 2 1 2 2 2 0 0 0 0 0 1 1 2 0 2 0 1 0 0 0 1
       1 0 2 2 2 2 0 2 1 2 1 1 1 1 2 2 0 0 1 1 0 0 2 2 2 0 1 2 2 1 0 2 1 1 0 0 1
       0 1 2 2 1 1 0 0 1 0 2 1 2 2 1 1 2 1 2 2 2 2 0 2 1 1 0 1 1 2 1 1 1 0 0 0 0
       2 2 1 0 2 2 1 1 0 2 1 2 2 0 2 0 0 2 0 1 2 2 1 0 1 2 2 0 0 1 1 0 2 0 2 2 1
       0 2 2 2 0 0 1 0 2 2 1 1 1 2 0 1 2 0 0 1 2 0 1 0 2 1 0 1 1 2 0 1 2 2 2 2 2
       2 2 0 2 0 0 1 1 1 0 0 2 0 1 0 0 1 1 2 0 2 0 1 1 2 0 1 0 1 2 1 1 0 2 0 0 0
       0 0 1 2 0 0 1 0 0 0 1 0 1 0 0 1 1 0 2 2 2 2 2 0 1 0 0 0 1 2 2 0 0 1 1 0 1
       0 0 0 1 1 2 0 0 2 0 0 0 1 0 1 2 2 2 0 1 1 0 0 2 0 0 1 0 0 1 1 2 2 2 1 1 2
       1 1 2 2 1 1 1 0 0 1 1 1 0 1 0 0 1 1 1 1 2 0 2 1 1 1 0 0 2 1 2 1 1 2 0 1 0
       2 2 2 0 0 0 0 0 0 2 2 1]
      [328 284 288]
      
    2. 2
      1. simple 1d arrays
        import numpy as np
        y = [0]*5 + [1]*2 + [2]*10
        y = np.array(y)
        x = np.array(list(range(len(y))))
        xy= np.vstack([x,y]).transpose()
        # ---------------------
        unq, unq_idx = np.unique(y, return_inverse=True)
        print("unq, unq_idx", unq, unq_idx)
        unq_cnt = np.bincount(unq_idx)
        print("unq_cnt", unq_cnt)
        min = np.min(unq_cnt)
        max = np.max(unq_cnt)
        print("max", max, "min", min)
        # cnt = round((max - min)/2 + min)
        cnt = max
        print("cnt", cnt)
        print("y.shape[1:]", y.shape[1:])
        out = np.empty((cnt*len(unq) - len(y),), y.dtype)
        # # out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype)
        print("out.shape", out.shape, "xy.shape", xy.shape)
        slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt)))
        print(slices)
        for j in range(len(unq)):
            indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j])
            print("indices", indices)
            out[slices[j]:slices[j+1]] = y[indices]
            print("out", out)
        # out = np.hstack((y, out))
        print(out)
        print(np.bincount(out), "- np.bincount(out) first sort ASC")
        
        unq, unq_idx [0 1 2] [0 0 0 0 0 1 1 2 2 2 2 2 2 2 2 2 2]
        unq_cnt [ 5  2 10]
        max 10 min 2
        cnt 10
        y.shape[1:] ()
        out.shape (13,) xy.shape (17, 2)
        [ 0  5 13 13]
        indices [0 4 0 4 4]
        out [                 0                  0                  0
                          0                  0    140160696704256
            140160713380912    140160696704416    140160696541680
             94915202709280                  0 172834964494878845
                        240]
        indices [6 6 5 6 5 5 5 6]
        out [0 0 0 0 0 1 1 1 1 1 1 1 1]
        indices []
        out [0 0 0 0 0 1 1 1 1 1 1 1 1]
        [0 0 0 0 0 1 1 1 1 1 1 1 1]
        [5 8] - np.bincount(out) first sort ASC
        
      2. simple xy
        import numpy as np
        y = [0]*5 + [1]*2 + [2]*10
        y = np.array(y)
        x = np.array(list(range(len(y))))
        xy= np.vstack([x,y]).transpose()
        # ---------------------
        unq, unq_idx = np.unique(y, return_inverse=True)
        print("unq, unq_idx", unq, unq_idx)
        unq_cnt = np.bincount(unq_idx)
        print("unq_cnt", unq_cnt)
        cnt = np.max(unq_cnt)
        print("cnt", cnt)
        print("y.shape[1:]", y.shape[1:])
        # out = np.empty((cnt*len(unq) - len(y),), y.dtype)
        out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype)
        print("out.shape", out.shape, "xy.shape", xy.shape)
        slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt)))
        print(slices)
        for j in range(len(unq)):
            indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j])
            print("indices", indices)
            out[slices[j]:slices[j+1]] = xy[indices]
            print("out", out)
        # out = np.hstack((y, out))
        print(out)
        # print(np.bincount(out), "- np.bincount(out) first sort ASC")
        
        unq, unq_idx [0 1 2] [0 0 0 0 0 1 1 2 2 2 2 2 2 2 2 2 2]
        unq_cnt [ 5  2 10]
        cnt 10
        y.shape[1:] ()
        out.shape (13, 2) xy.shape (17, 2)
        [ 0  5 13 13]
        indices [2 3 3 2 4]
        out [[2 0]
         [3 0]
         [3 0]
         [2 0]
         [4 0]
         [0 0]
         [0 0]
         [0 0]
         [0 0]
         [0 0]
         [0 0]
         [0 0]
         [0 0]]
        indices [6 6 6 5 5 5 5 6]
        out [[2 0]
         [3 0]
         [3 0]
         [2 0]
         [4 0]
         [6 1]
         [6 1]
         [6 1]
         [5 1]
         [5 1]
         [5 1]
         [5 1]
         [6 1]]
        indices []
        out [[2 0]
         [3 0]
         [3 0]
         [2 0]
         [4 0]
         [6 1]
         [6 1]
         [6 1]
         [5 1]
         [5 1]
         [5 1]
         [5 1]
         [6 1]]
        [[2 0]
         [3 0]
         [3 0]
         [2 0]
         [4 0]
         [6 1]
         [6 1]
         [6 1]
         [5 1]
         [5 1]
         [5 1]
         [5 1]
         [6 1]]
        
      3. full
        import numpy as np
        # ---------------------
        def calc_oversampl(xy):
            unq, unq_idx = np.unique(xy[:, -1], return_inverse=True)
            unq_cnt = np.bincount(unq_idx)
            cnt = np.max(unq_cnt)
            out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype)
            slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt)))
            for j in range(len(unq)):
                indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j])
                out[slices[j]:slices[j+1]] = xy[indices]
                # print(out)
            return np.vstack((xy, v))
        
        out = [0]*5 + [1]*2 + [2]*1
        v = np.array(v)
        x = np.array(list(range(len(v))))
        xy= np.vstack([x,v]).transpose()
        print(xy)
        print(np.bincount(xy[:,1]))
        out = calc_oversampl(xy)
        # print(out)
        print(np.bincount(out[:,1]))
        
        [[0 0]
         [1 0]
         [2 0]
         [3 1]
         [4 0]
         [5 1]
         [6 2]
         [7 0]
         [8 0]
         [9 0]]
        [7 2 1]
        [7 7 7]
        
      4. half
        import numpy as np
        def oversample(xy, maxc=None):
            unq, unq_idx = np.unique(xy[:, -1], return_inverse=True)
            unq_cnt = np.bincount(unq_idx)
            if maxc:
                cnt = maxc
            else:
                cnt = np.max(unq_cnt)
            out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype)
            slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt)))
            for j in range(len(unq)):
                indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j])
                out[slices[j]:slices[j+1]] = xy[indices]
            return np.vstack((xy, out))
        
        
        def oversamples_half(xy):
            # - separate part of xy with classes which count of examples > max(count of examples)//2
            unq, unq_idx = np.unique(xy[:, -1].astype(int), return_inverse=True)
            unq_cnt = np.bincount(unq_idx)
            cnt_half = np.max(unq_cnt) //2
            use_u = unq[unq_cnt<cnt_half]
            use_i = np.vectorize(lambda x: x in use_u)(xy[:,-1])
            use = xy[use_i]
            not_use = xy[~use_i]
            # print("use", np.bincount(use[:,1].astype(int)))
            out = oversample(use, maxc=cnt_half)
            # print("out", np.bincount(out[:,1].astype(int)))
            return np.vstack((out, not_use))
        
        xy = np.array(
        [[0,0],
        [1,0],
        [2,0],
        [3,1],
        [4,0],
        [5,1],
        [6,3],
        [7,0],
        [8,0],
        [9,0]]
        )
        # xy[:,1].astype(int)
        print(np.bincount(xy[:,1].astype(int)))
        out = calc_oversamples_half(xy)
        print(np.bincount(out[:,1].astype(int)))
        print(out)
        # print(np.bincount(out[:,1].astype(int)))
        
        [7 2 0 1]
        use [0 2 0 1]
        out [0 3 0 3]
        [7 3 0 3]
        [[3 1]
         [5 1]
         [6 3]
         [3 1]
         [6 3]
         [6 3]
         [0 0]
         [1 0]
         [2 0]
         [4 0]
         [7 0]
         [8 0]
         [9 0]]
        

10.45. common errors:

ValueError: Input 0 of layer "model" is incompatible with the layer: expected shape=(None, 200, 60, 1), found shape=(None, 60, 1)

  • print(type(input))
  • input: class =
  • tf.expand_dims(encsample["image"], axis=0)

tf.data.Dataset data = next(iterator) Cannot add tensor to the batch: number of elemets does not match. Shapes are: [tensor]: [4], [batch]: [5]

  • solutions:
    • .padded_patch
    • .apply(tf.data.experimental.dense_to_ragged_batch(…))

11. PyTorch

data_science#MissingReference install: https://pytorch.org/get-started/locally/ examples https://github.com/pytorch/examples/

  • GPU Tensors, Dynamic Neural Networks and deep Python integration
  • This is closer to writing code in any language as a for loop in code will behave as a for loop inside the graph structure as well.
  • TensorFlow doesn’t handle dynamic graphs very well though there are some not so flexible and frankly quite limiting primitive dynamic constructs.
  • Intel MKL and NVIDIA (CuDNN, NCCL) support
  • have their own official model repositories,

PyTorch:

  • replacement for NumPy to use the power of GPUs
  • deep learning research platform

HuggingFace: most models Pytorch

11.1. install

May 8, 2023

pip3 install torch==2.0.1 torchvision==0.15.2 torchaudio==2.0.2 --index-url https://download.pytorch.org/whl/cu118

11.2. history

  • 2002 - Torch (picked up by Facebook AI Research). Lua + C. three key features:
    • ease the development of numerical algorithms.
    • easily extended
    • fast
  • 2017 PyTorch beta.
  • Caffe2 was merged into PyTorch at the end of March 2018
  • 1.13
    • BetterTransformer supports fastpath execution for common Transformer models during Inference out-of-the-box, without the need to modify the model.
    • Functorch now in PyTorch Core Library - composable vmap (vectorization) and autodiff transforms.
  • PyTorch 2.0 has been released on 15 March 2023 (2-series)
  • PyTorch 2.2 SDPA FlashAttention-2, TorchInductor, device_mesh, TORCH_LOGS.

11.2.1. PyTorch 2.0

  • fundamentally changing and supercharging how PyTorch operates at compiler level under the hood.
  • faster performance and support for Dynamic Shapes and Distributed.
  • torch.compile - from C++ back into Python - additive (and optional) feature
  • 2.0 is 100% backward compatible

TorchDynamo

AOTAutograd

PrimTorch

TorchInductor

Compilation steps:

  • graph acquisition - TorchDynamo + AOTAutograd
  • graph lowering - ATen/ Prim IR
  • graph compilation - TorchInductor(default) powered by Triton. Features:
    • your own backend
    • nvFuser
    • TVM
    • XLA
    • AITemplate
    • TensorRT

11.2.2. FlashAttention-2 - approximate attention method

FlashAttention: Fast and Memory-Efficient Exact Attention with IO-Awareness

Transformers: time and memory complexity of self-attention are quadratic in sequence length.

FlashAttention trains Transformers faster than existing baselines: 15% end-to-end wall-clock speedup on BERT-large

11.3. deployment

  • TorchServe
    • endpoint specification, model archiving, and observing metrics
    • provide REST and gRPC APIs
    • still in its infancy
  • PyTorch Live - build upon old PyTorch Mobile
    • uses JavaScript and React Native to create cross-platform iOS and Android AI-powered apps
    • focuses on mobile only

11.4. ecosystem

https://pytorch.org/ecosystem/

11.5. PyTorch 2.0

https://pytorch.org/get-started/pytorch-2.0

features:

  • model compilation or compiled mode - wraps your model and returns a compiled model.
    • will allow models to be ahead-of-time compiled for lightning-fast execution.
    • compiles the forward function to a more optimized version.
    • When compiling the model, we give a few knobs to adjust it.
    • drop-in replacement for torch.jit.script()
  • make distributed training simpler too
  • TorchDynamo allow access model attributes like weight and modify them.

famous models:

  • DALL-E 2
  • Stable Diffusion
  • ChatGPT.

torch.distributed

  • DistributedDataParallel (DDP) - relies on overlapping AllReduce communications with backwards computation
  • FullyShardedDataParallel (FSDP) - “beta”

11.6. device

import torch

# Set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
# Set the device globally
torch.set_default_device(device)

if device == "cuda":
    GPU_SCORE = torch.cuda.get_device_capability()
    # optimization - perform faster matrix multiplications
    if GPU_SCORE >= (8, 0):
        print(f"[INFO] Using GPU with score: {GPU_SCORE}, enabling TensorFloat32 (TF32) computing (faster on new GPUs)")
        torch.backends.cuda.matmul.allow_tf32 = True
    else:
        print(f"[INFO] Using GPU with score: {GPU_SCORE}, TensorFloat32 (TF32) not available, to use it you need a GPU with score >= (8, 0)")
        torch.backends.cuda.matmul.allow_tf32 = False


11.7. models - torchvision.models

import torchvision.models as models
# from torchvision.models import resnet50
resnet = models.resnet50(weights=None) # random initialization

Torch Hub

import torch

# Option 1: passing weights param as string
model = torch.hub.load("pytorch/vision", "resnet50", weights="IMAGENET1K_V2")

11.8. nn.Module

  • model.parameters() - the learnable parameters (i.e. weights and biases
  • model.state_dict() is simply a Python dictionary object that maps each layer to its parameter tensor.

11.8.1. nn.Linear

y = x*(A^T) + b , idk why ^T

import numpy as np
m = np.random.random((2,3)) # Linear(in_features=2, out_features=5)
input = np.random.random((10,2))
print(np.matmul(input,m).shape)
(10, 3)

11.9. Dataset and DataLoader, transform

  • Dataset - retrieves our dataset’s features and labels one sample at a time.

    • from torch.utils.data import Dataset (must be created)
    • Dataset - map-style datasets, - __getitem__() and __len__(), accessible with dataset[idx]
    • IterableDataset - iterable-style datasets. - __iter__() - when called iter(dataset), could return a stream

    of data reading from a database, a remote server, or even logs generated in real time.

    • multi-process data loading.
  • DataLoader - minibatches, reshuffle the data at every epoch to reduce model overfitting, and use Python’s multiprocessing to speed up data retrieval.
    • Dataset -> Sampler -> BatchSampler + Dataset -> Data batch
    • from torch.utils.data import DataLoader (accept Dataset as constructor argument)

samplers is to determine how batches should be formed. they are passed to a PyTorch Dataloader

  • When the dataloader is initialized, the sampler is also passed to it ( RandomSampler by default) which first create the sequence order in which the the samples in dataset is accessed using index.ie (1,2,3..N) where N = size of the dataset.

test Dataset:

img, lab = train_dataset.__getitem__(0)

test DataLoader:

img, lab = iter(train_loader).next()

Trnasform - part of Dataset implementation, applyed in __getitem__()

sample = self.transform(sample) ; return sample

Approach 2):

  • train_dataset = torchvision.datasets.ImageFolder(root='aa/train', transform=MyTransform)

11.9.1. code

import torch
import torchvision.models as models
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision.io import read_image
from torchvision import transforms

IMG_WIDTH = 64
IMG_HEIGHT = 64

# - image format
default_float_dtype = torch.get_default_dtype()


class LandmarkDataset(Dataset):
    def __init__(self, paths, labels, transform=None, target_transform=None):
        self.paths = paths
        self.labels = labels
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = read_image(self.paths[idx])
        image = image.to(dtype=default_float_dtype).div(255)
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

def main():
    x_train, y_train = get_dataset()
    data_transform = transforms.Compose([
        transforms.RandomResizedCrop((IMG_HEIGHT, IMG_WIDTH)),
        # transforms.ToTensor()  # to [0.0, 1.0]
        ])
    train_dataset: Dataset = LandmarkDataset(x_train, y_train,
                                             transform=data_transform)
    train_loader: DataLoader = DataLoader(train_dataset)
    # img, lab = train_dataset.__getitem__(0)
    img, lab = next(iter(train_loader))
    print(img, lab)

11.10. Built-in datasets

all datasets return PIL Image: Image.fromarray(img.numpy(), mode="L")

  • from PIL import Image

training.pt We no longer cache the data in a custom binary, but simply read from the raw data directly.

11.11. train

from datetime import datetime
import torch
import torchvision.models as models
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision.io import read_image
from torchvision import transforms


class LandmarkDataset(Dataset):
    def __init__(self, paths, labels, transform=None, target_transform=None):
        self.paths = paths
        self.labels = labels
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = read_image(self.paths[idx])
        image = image.to(dtype=default_float_dtype).div(255)
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        # return image, label
        return image.to(device), torch.tensor(label, dtype=torch.long).to(device)

def train_one_epoch(epoch_index, training_loader, optimizer, model, loss_fn, tb_writer=None):
    """ training_loader is (inputs, labels) """
    running_loss = 0.
    last_loss = 0.
    avg_loss = 0.

    for i, data in enumerate(training_loader):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % 10 == 9:
            avg_loss = running_loss / (1 if i // 10 == 0 else i // 10)
            print('  batch {} loss: {}'.format(i + 1, round(avg_loss,2)))
            # tb_x = epoch_index * len(training_loader) + i + 1
            # tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            # running_loss = 0.

    return avg_loss

def train(model, training_loader, validation_loader, loss_fn, wirter=None): # require import datetime
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    # writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
    epoch_number = 0
    EPOCHS = 2
    best_vloss = 1_000_000.

    for epoch in range(EPOCHS):
        print('EPOCH {}:'.format(epoch_number + 1))
        # ---- train ----
        model.train(True)
        avg_loss = train_one_epoch(epoch_number,
                                   training_loader=training_loader,
                                   # optimizer=torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9),
                                   optimizer=torch.optim.Adam(model.parameters()),
                                   model=model,
                                   loss_fn=loss_fn,
                                   tb_writer=None)

        running_vloss = 0.0
        # ---- validate ----
        model.eval()

        # - Disable gradient computation and reduce memory consumption.
        with torch.no_grad():
            for i, vdata in enumerate(validation_loader):
                vinputs, vlabels = vdata
                voutputs = model(vinputs)
                vloss = loss_fn(voutputs, vlabels)
                running_vloss += vloss

        avg_vloss = running_vloss / (i + 1)
        print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

        # writer.add_scalars('Training vs. Validation Loss',
        #                 { 'Training' : avg_loss, 'Validation' : avg_vloss },
        #                 epoch_number + 1)
        # writer.flush()

        if avg_vloss < best_vloss:
            best_vloss = avg_vloss
            model_path = 'model_{}_{}'.format(timestamp, epoch_number)
            torch.save(model.state_dict(), model_path)  # save the model's state

        epoch_number += 1

def create_model(classes) -> torch.nn.Module:
    resnet = models.resnet50(weights=None)
    num_ftrs = resnet.fc.in_features
    resnet.fc = torch.nn.Linear(num_ftrs, out_features=classes)
    return resnet


def main():
    x_train, x_valid, y_train, y_valid, OUTPUT_SIZE = get_dataset()
    data_transform = transforms.Compose([
        transforms.RandomResizedCrop((IMG_HEIGHT, IMG_WIDTH)),
        # transforms.ToTensor()  # to [0.0, 1.0]
        ])
    train_dataset: Dataset = LandmarkDataset(x_train, y_train,
                                             transform=data_transform)
    from torch.utils.data.dataloader import default_collate
    generator = torch.Generator(device=device)
    train_loader: DataLoader = DataLoader(train_dataset,
                                          shuffle=True, batch_size=BATCH_SIZE,
                                          generator=generator)  # , pin_memory_device=device, pin_memory=True

    # collate_fn=lambda x: (default_collate(x[0]).to(device), default_collate(torch.from_numpy(x[1])).to(device))
    valid_dataset: Dataset = LandmarkDataset(x_valid, y_valid,
                                             transform=data_transform)
    valid_loader: DataLoader = DataLoader(valid_dataset)
    # img, lab = train_dataset.__getitem__(0)
    # img, lab = next(iter(train_loader))
    # print(img, lab)
    # -- train
    model: torch.nn.Module = create_model(OUTPUT_SIZE)  # load model definition
    print(model)
    train(model, training_loader=train_loader, validation_loader=valid_loader,
          loss_fn=torch.nn.CrossEntropyLoss())
    # -- save, load and inference
    import os
    PATH = os.path.join(os.getcwd(), 'savedmodel')
    torch.save(model.state_dict(), PATH)

11.12. train (old)

data, target = data.to(device), target.to(device)

optimizer.zero_grad()

output = model(data)

loss = F.nll_loss(output, target)

loss.backward(retain_graph=True)

optimizer.step()

When we call loss.backward() - all Tensors in the graph that has requires_grad=True will have their .grad Tensor accumulated with the gradient.

11.13. loss, inference, accuracy

import torch
loss = torch.nn.CrossEntropyLoss()
input = torch.randn(3, 5, requires_grad=True)
target = torch.empty(3, dtype=torch.long).random_(5)
output = loss(input, target)
output.backward()
print(output)


# after save:
model = create_model(OUTPUT_SIZE)
model.load_state_dict(torch.load(PATH))
model.eval()
# -- inference
img, lab = next(iter(DataLoader(valid_dataset, shuffle=True, batch_size=1
                                ,generator=generator
                                )))  # get random item
print("lab", lab)
result: torch.Tensor = model(img)
import numpy as np
print("result", np.argmax(result.cpu().detach().numpy()))

Accuracy:

import torch
target = torch.tensor([0, 1, 1])
preds = torch.tensor([[0.1, 0.9, 0], [0.3, 0.1, 0.6], [0.2, 0.5, 0.3]])
accuracy = torch.metrics.Accuracy(task="multiclass", num_classes=3, top_k=2)
print(accuracy(preds, target))

11.14. numpy

import torch
x = torch.empty(5, 3)
print(x)

print(x.size())
>> torch.Size([5, 3])
# Converting a Torch Tensor to a NumPy Array
n = torch.ones(5).numpy()
# Converting NumPy Array to Torch Tensor
t = torch.from_numpy(a)
# tensors on CUDA
if torch.cuda.is_available():
    device = torch.device("cuda")          # a CUDA device object
    y = torch.ones_like(x, device=device)
    x = x.to(device)
    z = x + y
    print(z.to("cpu", torch.double)) # back to cpu

# random
x = torch.randn(4, 4) # from a normal distribution - mean 0 and variance 1
x = torch.rand(4, 4) # on the interval [0,1)

# resize/reshape
y = x.view(16) # line
z = x.view(-1, 8) # column:  torch.Size([2, 8])

# torch.squeeze(input, dim=None, out=None) → Tensor
# tensor(A×1×B×C×1×D)
>>> x = torch.zeros(2, 1, 2, 1, 2)
torch.Size([2, 1, 2, 1, 2])
# выжимать remove 1 size dimensions
>>> y = torch.squeeze(x) #
torch.Size([2, 2, 2])
>>> y = torch.squeeze(x, 0)
torch.Size([2, 1, 2, 1, 2])
>>> y = torch.squeeze(x, 1)
torch.Size([2, 2, 1, 2])

# Concatenates sequence of tensors along a new dimension:
torch.stack(tensors: list, dim=0, out=None) → Tensor

# transpose
t = torch.tensor([[1,2,3],[4,5,6]])
torch.transpose(t,0,1)
>tensor([[1, 4],
        [2, 5],
        [3, 6]])

# add dimension
>> torch.Size([1, 2])
a.unsqueeze(0).size()
>> torch.Size([1, 1, 2])
a.unsqueeze(-1).size()
>> torch.Size([1, 2, 1])

11.15. layers

import torch.nn as nn
import torch.nn.functional as F # activation
  • CNN
    • nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1)) -

11.16. noise

       r = (0.1**0.9)*torch.randn(self.levels, batch, self.hidden_size//2, dtype=dtype, device=self.device)
self.hidden1 = (self.hidden1[0] + r, self.hidden1[1] + r)

11.17. basic nn and gradient

input 32x32

torch.Size([64, 32, 26, 26]) - batch_size, output_channels, Height, Width

Trainable parameters:

params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Trainable parameters: {params:,}")

Recap:

  • torch.Tensor - A multi-dimensional array with support for autograd operations like backward(). Also holds the gradient w.r.t. the tensor.
    • IF .requires_grad as True - it starts to track all operations on it. accumulated into .grad
    • with torch.no_grad(): - for testing
  • nn.Module - Neural network module. Convenient way of encapsulating parameters, with helpers for moving them to GPU, exporting, loading, etc.
  • nn.Parameter - A kind of Tensor, that is automatically registered as a parameter when assigned as an attribute to a Module.
  • autograd.Function - Implements forward and backward definitions of an autograd operation. Every Tensor operation creates at least a single Function node that connects to functions that created a Tensor and encodes its history.

11.17.1. first

import torch
import torch.nn as nn # layer
import torch.nn.functional as F # activation


class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        # 1 input image channel, 6 output channels, 3x3 square convolution
        # kernel
        self.conv1 = nn.Conv2d(1, 6, 3) # input 1 image to 6, 3x3 kernel, stride=1 default
        self.conv2 = nn.Conv2d(6, 16, 3)
        self.dropout1 = nn.Dropout2d(0.25)
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(16 * 6 * 6, 120)  # 6*6 from image dimension
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        # If the size is a square you can only specify a single number
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features


net = Net()
print(net) # print all layers
params = list(net.parameters()) # learnable parameters of a model




import torch.optim as optim

# create your optimizer
optimizer = optim.SGD(net.parameters(), lr=0.01)

# in your training loop:
optimizer.zero_grad()   # zero the gradient buffers
output = net(input)
loss = criterion(output, target)
loss.backward()
optimizer.step()    # Does the updatee

11.17.2. second

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)

        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model: nn.Module, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(args, model: nn.Module, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


def main():
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=14, metavar='N',
                        help='number of epochs to train (default: 14)')
    parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
                        help='learning rate (default: 1.0)')
    parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
                        help='Learning rate step gamma (default: 0.7)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')

    parser.add_argument('--save-model', action='store_true', default=False,
                        help='For Saving the current Model')
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    # random seed
    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=args.batch_size, shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data', train=False, transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=args.test_batch_size, shuffle=True, **kwargs)

    # load model to GPU
    model: nn.Module = Net()
    # print(model.shape)
    # print(model.parameters())
    # params = list(model.)
    # print('params', params)
    params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Trainable parameters: {params:,}")
    model = Net().to(device)

    # optimizer
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
h        test(args, model, device, test_loader)
        scheduler.step()

    if args.save_model:
        torch.save(model.state_dict(), "mnist_cnn.pt")


if __name__ == '__main__':
    main()

11.18. LSTM

11.18.1. nn.LSTM

expects all of its inputs to be 3D tensors:

  • sequence itself
  • indexes instances in the mini-batch
  • indexes elements of the input
rnn = nn.LSTM(input_size=10, hidden_size=20, num_layers=2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20) # layers, batch size, hidden
c0 = torch.randn(2, 3, 20)
output, (hn, cn) = rnn(input, (h0, c0))

If the following conditions are satisfied, persistent algorithm can be selected to improve performance:

  1. cudnn is enabled
  2. input data is on the GPU
  3. input data has dtype torch.float16
  4. V100 GPU is used,
  5. input data is not in PackedSequence format

11.18.2. nn.LSTMCell

rnn = nn.LSTMCell(input_size=10, hidden_size=20)
input = torch.randn(6, 3, 10) # 3=batch size
hx = torch.randn(3, 20) # batch_size, hidden_size
cx = torch.randn(3, 20)
output = []
for i in range(6):
  hx, cx = rnn(input[i], (hx, cx))
  output.append(hx)

11.18.3. numbers of parameters

gate_size = 4 * hidden_size # = 4
w_ih = Parameter(torch.Tensor(gate_size, layer_input_size))
w_hh = Parameter(torch.Tensor(gate_size, hidden_size))
b_ih = Parameter(torch.Tensor(gate_size))
b_hh = Parameter(torch.Tensor(gate_size))
layer_params = (w_ih, w_hh, b_ih, b_hh) # one lstm

4*4 = 16 parameters

4*(4*is + 4*hs  + 4 + 4) # for first layer

11.18.4. basic

import torch
import torch.nn as nn

# num_layers = 1, bias=True, bidirectional=False
lstm = nn.LSTM(input_size=1, hidden_size=1)
inputs = [torch.randn(1, 1) for _ in range(5)]  # make a sequence of length 5

# initialize the hidden state.
hidden = (torch.randn(1, 1, 1),
          torch.randn(1, 1, 1))
for i in inputs:
    # Step through the sequence one element at a time.
    # after each step, hidden contains the hidden state.
    out, hidden = lstm(i.view(1, 1, -1), hidden)

# alternatively, we can do the entire sequence all at once.
# the first value returned by LSTM is all of the hidden states throughout
# the sequence. the second is just the most recent hidden state
# (compare the last slice of "out" with "hidden" below, they are the same)
# The reason for this is that:
# "out" will give you access to all hidden states in the sequence
# "hidden" will allow you to continue the sequence and backpropagate,
# by passing it as an argument  to the lstm at a later time
# Add the extra 2nd dimension
inputs = torch.cat(inputs).view(len(inputs), 1, -1)
hidden = (torch.randn(1, 1, 1), torch.randn(1, 1, 1))  # clean out hidden state
out, (hn, cn) = lstm(inputs, hidden)
params = sum(p.numel() for p in lstm.parameters())
print(list(lstm.parameters()))
print(f"Trainable parameters: {params:,}")
print(out)
print(hn)
print(cn)

11.18.5. tagging model

class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix))
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# See what the scores are before training
# Note that element i,j of the output is the score for tag j for word i.
# Here we don't need to train, so the code is wrapped in torch.no_grad()
with torch.no_grad():
    inputs = prepare_sequence(training_data[0][0], word_to_ix)
    tag_scores = model(inputs)
    print(tag_scores)

for epoch in range(300):  # again, normally you would NOT do 300 epochs, it is toy data
    for sentence, tags in training_data:
        # Step 1. Remember that Pytorch accumulates gradients.
        # We need to clear them out before each instance
        model.zero_grad()

        # Step 2. Get our inputs ready for the network, that is, turn them into
        # Tensors of word indices.
        sentence_in = prepare_sequence(sentence, word_to_ix)
        targets = prepare_sequence(tags, tag_to_ix)

        # Step 3. Run our forward pass.
        tag_scores = model(sentence_in)

        # Step 4. Compute the loss, gradients, and update the parameters by
        #  calling optimizer.step()
        loss = loss_function(tag_scores, targets)
        loss.backward()
        optimizer.step()

# See what the scores are after training
with torch.no_grad():
    inputs = prepare_sequence(training_data[0][0], word_to_ix)
    tag_scores = model(inputs)

    # The sentence is "the dog ate the apple".  i,j corresponds to score for tag j
    # for word i. The predicted tag is the maximum scoring tag.
    # Here, we can see the predicted sequence below is 0 1 2 0 1
    # since 0 is index of the maximum value of row 1,
    # 1 is the index of maximum value of row 2, etc.
    # Which is DET NOUN VERB DET NOUN, the correct sequence!
    print(tag_scores)

11.18.7. GPU CUDA

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") if torch.cuda.is_available(): input = input.cuda() # GPU target = target.cuda() # GPU test_input = test_input.cuda() test_target = test_target.cuda()

seq: Model = Sequence() seq.double() seq = seq.to(device) # GPU

self.hidden = (torch.rand(self.levels, input.size(0), 51, dtype=torch.double), # layers, batch, hidden torch.rand(self.levels, input.size(0), 51, dtype=torch.double)) if torch.cuda.is_available(): self.hidden = (self.hidden[0].cuda(), self.hidden[1].cuda())

11.18.8. SGD

optim = torch.optim.SGD(model.parameters(), lr=0.01) lr = 0.5 * 1.2 optimizer = torch.optim.SGD(seq.parameters(), lr=lr, momentum=0.2) for s in range(STEPS): lr = lr / 1.2 print("lr", lr)

for g in optimizer.param_groups: g['lr'] = lr

11.19. Distributed - torch.distributed

11.19.1. overview

  • DistributedDataParallel (DDP)
    • torch.nn.parallel.DistributedDataParallel
  • FullyShardedDataParallel (FSDP) - “beta” higher level of complexity

    • indicate which submodules of their model to wrap together in an FSDP instance used for state sharding, or

    manually wrap submodules in FSDP instances

    • If FSDP is used without wrapping submodules in separate instances, it falls back to operating similarly to

    DDP, but without bucketing

    • torch.distributed.fsdp

Two approaches to run:

  • torch.distributed.launch
  • torchrun (elastic)

model is wrapped with DistributedDataParallel:

  • add hooks in forward() and backward() - for communicating

torch.distributed.launch

11.19.2. huggingface

11.19.4. FSDP

  1. performance optimizations
    • Mixed Precision - with BFloat16 resulted in ~5x improvement versus FP32
    • Activation Checkpointing (AC) - reinvesting the freed memory from the checkpoints into larger batch size
    • Transformer Wrapping Policy vs default wrapping policy. 20-25% slower! free 33-38% GPU memory! Freed up memory can be used to increase batch size for speed.
    • Full Shard Strategy versus zero2 (DDP) resulted in 1.5x improvement.

      transformer wrapping policy and activation checkpointing - required for 3 nodes - T5 11B model

      sharding_strategy -

    • FULL_SHARD - default -
    • SHARD_GRAD_OP - Zero2 mode - model parameters are not freed after forward pass, reducing communication needs
    • NO_SHARD - DDP mode , just copy of model, only grad synch needed
  2. ex tutorial
    import torch.distributed as dist
    
    
    world_size = 2
    rank = 0 # per worker 0 ... ?
    
    fsdp_main(rank, world_size, batch_size, test_batch_size
    
    def fsdp_main(rank, world_size, args):
        setup(rank, world_size)
    
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])
    
        dataset1 = datasets.MNIST('../data', train=True, download=True,
                            transform=transform)
        dataset2 = datasets.MNIST('../data', train=False,
                            transform=transform)
    
        sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True)
        sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size)
    
        train_kwargs = {'batch_size': args.batch_size, 'sampler': sampler1}
        test_kwargs = {'batch_size': args.test_batch_size, 'sampler': sampler2}
        cuda_kwargs = {'num_workers': 2,
                        'pin_memory': True,
                        'shuffle': False}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)
    
        train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
        test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
        my_auto_wrap_policy = functools.partial(
            size_based_auto_wrap_policy, min_num_params=100
        )
        torch.cuda.set_device(rank)
    
    
        init_start_event = torch.cuda.Event(enable_timing=True)
        init_end_event = torch.cuda.Event(enable_timing=True)
    
        model = Net().to(rank)
    
        model = FSDP(model,
                     fsdp_auto_wrap_policy=my_auto_wrap_policy,
                     cpu_offload=CPUOffload(offload_params=True))
    
        optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
    
        scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
        init_start_event.record()
        for epoch in range(1, args.epochs + 1):
            train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
            test(model, rank, world_size, test_loader)
            scheduler.step()
    
        init_end_event.record()
    
        if rank == 0:
            print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec")
            print(f"{model}")
    
        if args.save_model:
            # use a barrier to make sure training is done on all ranks
            dist.barrier()
            # state_dict for FSDP model is only available on Nightlies for now
            states = model.state_dict()
            if rank == 0:
                torch.save(states, "mnist_cnn.pt")
    
        cleanup()
    
  3. ex t5
    from torch.distributed.fsdp import (
        FullyShardedDataParallel as FSDP,
        CPUOffload,
        MixedPrecision,
        BackwardPrefetch,
        ShardingStrategy,
        FullStateDictConfig,
        StateDictType,
    )
    from torch.utils.data.distributed import DistributedSampler
    
    class train_config:
        model_name: str="t5-base"
        run_validation: bool=True
        batch_size_training: int=4
        num_workers_dataloader: int=2
        lr: float=0.002
        weight_decay: float=0.0
        gamma: float= 0.85
        use_fp16: bool=False
        mixed_precision: bool=True
        save_model: bool=False
    
    
    
    class fsdp_config:
        mixed_precision: bool=True
        use_fp16: bool=False
        seed: int=42
        fsdp_activation_checkpointing: bool=True
        limit_all_gathers: bool=True
        sharding_strategy: ShardingStrategy = ShardingStrategy.FULL_SHARD #HYBRID_SHARD, SHARD_GRAD_OP
        checkpoint_type: StateDictType = StateDictType.FULL_STATE_DICT # alternatively can use SHARDED_STATE_DICT to avoid OOMs
        save_optimizer: bool=False
    
    
    from torch.distributed.fsdp import (
        # FullyShardedDataParallel as FSDP,
        # CPUOffload,
        MixedPrecision,
        # BackwardPrefetch,
        # ShardingStrategy,
    )
    
    # requires grad scaler in main loop
    fpSixteen = MixedPrecision(
        param_dtype=torch.float16,
        # Gradient communication precision.
        reduce_dtype=torch.float16,
        # Buffer precision.
        buffer_dtype=torch.float16,
    )
    
    bfSixteen = MixedPrecision(
        param_dtype=torch.bfloat16,
        # Gradient communication precision.
        reduce_dtype=torch.bfloat16,
        # Buffer precision.
        buffer_dtype=torch.bfloat16,
    )
    
    bfSixteen_working = MixedPrecision(
        param_dtype=torch.float32,
        reduce_dtype=torch.bfloat16,
        buffer_dtype=torch.bfloat16,
    )
    
    fp32_policy = MixedPrecision(
        param_dtype=torch.float32,
        reduce_dtype=torch.float32,
        buffer_dtype=torch.float32,
    )
    
    
    def get_policies(cfg, rank):
    
        """establish current policies for mixed precision and fsdp wrapping"""
    
        mixed_precision_policy = None
        wrapping_policy = None
    
        # mixed precision -----
        if cfg.mixed_precision:
            bfloat_available = bfloat_support()
            if bfloat_available and not cfg.use_fp16:
                mixed_precision_policy = policies.bfSixteen
                if rank == 0:
                    print(f"bFloat16 enabled for mixed precision - using bfSixteen policy")
            elif cfg.use_fp16:
                mixed_precision_policy = policies.fpSixteen
                if rank == 0:
                    print(f"FP16 enabled. ")
            else:
                # mixed_precision_policy = policies.fpSixteen
                print(
                    f"bFloat16 support not present. Will use FP32, and not mixed precision"
                )
    
        wrapping_policy = policies.get_t5_wrapper()
    
        return mixed_precision_policy, wrapping_policy
    
    
    def setup():
        # initialize the process group
        dist.init_process_group("nccl")
    
    
    def cleanup():
        dist.destroy_process_group()
    
    local_rank = int(os.environ['LOCAL_RANK'])
    rank = int(os.environ['RANK'])
    world_size = int(os.environ['WORLD_SIZE'])
    
    run_validation = True
    track_memory = True
    epochs = 1
    batch_size = 1
    test_batch_size = 1
    
    sampler1 = DistributedSampler(train_dataset, rank=rank, num_replicas=world_size, shuffle=True)
    sampler2 = DistributedSampler(val_dataset, rank=rank, num_replicas=world_size)
    
    setup()
    
    train_kwargs = {'batch_size': batch_size, 'sampler': sampler1}
    test_kwargs = {'batch_size': test_batch_size, 'sampler': sampler2}
    cuda_kwargs = {'num_workers': 2,
                   'pin_memory': True,
                   'shuffle': False}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)
    
    train_loader = torch.utils.data.DataLoader(train_dataset,**train_kwargs)
    val_loader = torch.utils.data.DataLoader(val_dataset, **test_kwargs)
    
    torch.cuda.set_device(local_rank)
    
    mixed_precision_policy, t5_auto_wrap_policy = get_policies(train_config, rank)
    
    # Apply FSDP wrapping to the model
    model = FSDP(model,
            auto_wrap_policy=t5_auto_wrap_policy,
            mixed_precision=mixed_precision_policy,
            sharding_strategy=fsdp_config.sharding_strategy,
            device_id=torch.cuda.current_device(),
            limit_all_gathers=fsdp_config.limit_all_gathers)
    
    # if fsdp_config.fsdp_activation_checkpointing:
    #         policies.apply_fsdp_checkpointing(model)
    
    # Set up optimizer and scheduler
    optimizer = optim.AdamW(model.parameters(), lr=train_config.lr)
    
    scheduler = StepLR(optimizer, step_size=1, gamma=train_config.gamma)
    
    best_val_loss = float("inf")
    curr_val_loss = float("inf")
    file_save_name = "T5-model-"
    
    if rank == 0:
        time_of_run = get_date_of_run()
        dur = []
        train_acc_tracking = []
        val_acc_tracking = []
        training_start_time = time.time()
    
    if rank == 0 and track_memory:
        mem_alloc_tracker = []
        mem_reserved_tracker = []
    
    
    for epoch in range(1, epochs + 1):
        t0 = time.time()
        train_accuracy = train(model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)
        if run_validation:
            curr_val_loss = validation(model, rank, world_size, val_loader)
        scheduler.step()
    
        if rank == 0:
            print(f"--> epoch {epoch} completed...entering save and stats zone")
    
           dur.append(time.time() - t0)
           train_acc_tracking.append(train_accuracy.item())
    
           if run_validation:
               val_acc_tracking.append(curr_val_loss.item())
    
  4. troubleshooting

    RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'

    • 'cuda' is set with torch.set_default_device("cuda")
    • shuffled Sampler always create generator = torch.Generator()
    • Solution: disable shuffle or set torch.set_default_device("cpu")

    RuntimeError: cannot pin 'torch.cuda.FloatTensor' only dense CPU tensors can be pinned

    • solution: place everythin of CPU according to tutorial
    • save dataset items to CPU

    CUDA error: invalid device ordinal

    • 1694694477 worker-0: CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
    • 1694694477 worker-0: For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
    • 1694694477 worker-0: Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
    • Solution: ? I forgot, set .to(device) not .to(rank)

    Timed out initializing process group in store based barrier on rank

    • increase: torch.distributed.init_process_group(timeout=datetime.timedelta(seconds=1800))

    RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method

    • pickle.load problem with read_image no problem
    try:
        torch.multiprocessing.set_start_method('spawn',force=True)
    except RuntimeError:
        pass
    

11.19.5. elastic (launch)

torchrun - superset of the functionality as torch.distributed.launch

11.19.7. KubeFlow PyTorchJob

$ env for pod/pytorch-simple-worker-0:

KUBERNETES_SERVICE_PORT_HTTPS=443
NVIDIA_VISIBLE_DEVICES=all
KUBERNETES_SERVICE_PORT=443
PYTHONUNBUFFERED=0
HOSTNAME=pytorch-simple-worker-0
MASTER_PORT=23456
PWD=/workspace
NVIDIA_DRIVER_CAPABILITIES=compute,utility
WORLD_SIZE=2
HOME=/root
KUBERNETES_PORT_443_TCP=tcp://10.96.0.1:443
PYTORCH_VERSION=2.0.1
MASTER_ADDR=pytorch-simple-master-0
TERM=xterm
SHLVL=1
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_PORT_443_TCP_ADDR=10.96.0.1
LD_LIBRARY_PATH=/usr/local/nvidia/lib:/usr/local/nvidia/lib64
RANK=1
KUBERNETES_SERVICE_HOST=10.96.0.1
KUBERNETES_PORT=tcp://10.96.0.1:443
KUBERNETES_PORT_443_TCP_PORT=443

11.19.8. investiage

import torch
print("distributed available", torch.distributed.is_available())
print("distributed initilized", torch.distributed.is_initialized())
# -- CUDA
torch.cuda.is_available() # True
torch.cuda.device_count() # 1
torch.cuda.current_device() # 0
torch.cuda.device(0) # <torch.cuda.device at 0x7efce0b03be0>
torch.cuda.get_device_name(0) # 'GeForce GTX 950M'
print("cuda")
print(torch.cuda.is_available()) # True
print(torch.cuda.device_count()) # 1
print(torch.cuda.current_device()) # 0
print(torch.cuda.device(0)) # <torch.cuda.device at 0x7efce0b03be0>
print(torch.cuda.get_device_name(0)) # 'GeForce GTX 950M'
print()

11.20. retain_graph

https://pytorch.org/docs/stable/autograd.html

loss.backward(retain_graph=True)

LSTM slowed becouse of hidden state saved between. Solutions:

  • detach/repackage the hidden state in between batches.
    • hidden.detach_()
    • hidden = hidden.detach()

11.21. memory management

if a is a tensor:

  • a.to(torch.device("cpu"/"cuda:0")) - move tensor around

making sure t2 is on the same device as t2

  • a = t1.get_device()
  • b = torch.tensor(a.shape).to(dev)

Using Multiple GPUs:

  • Data Parallelism, where we divide batches into smaller batches, and process these smaller batches in parallel on multiple GPU.
  • Model Parallelism, where we break the neural network into smaller sub networks and then execute these sub networks on different GPUs.
del out, loss - free tensor/model
torch.cuda.empy_cache() - empty garbage

with torch.no_grad(): - PyTorch, by default, will create a computational graph during the forward pass. During creation of this graph, it will allocate buffers to store gradients and intermediate values which are used for computing the gradient during the backward pass.

CuDNN can provided a lot of optimisation which can bring down your space usage,

  • torch.backends.cudnn.benchmark = True
  • torch.backends.cudnn.enabled = True

Using 16-bit Floats

  • model = model.half() # convert a model to 16-bit
  • input = input.half() # convert a model to 16-bit
  • issues:
    • batch-norm layers have convergence issues with half precision floats. If that's the case with you, make sure that batch norm layers are float32
    • You can have overflow issues with 16-bit float. Once, I remember I had such an overflow while trying to store the Union area of two bounding boxes (for computation of IoUs) in a float16. So make sure you have a realistic bound on the value you are trying to save in a float16.

11.22. troubleshooting

Input type (torch.FloatTensor) and weight type (torch.cuda.FloatTensor)

  • dataset on CPU, model on GPU
  • solution: Dataset.__getItem__(self, idx): return image.to(device), torch.tensor(label, dtype=torch.long).to(device)

"RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'"

  • solution:
generator = torch.Generator(device=device)
train_loader: DataLoader = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE, generator=generator)

AttributeError: 'collections.OrderedDict' object has no attribute 'eval'

model = TempModel()
model.load_state_dict(torch.load(file_path))

torch.cuda.OutOfMemoryError: CUDA out of memory. If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:256"

11.23. plot learning curve

LOGFILE=torch/logs/log-2023-09-10-local.txt
cat $LOGFILE | grep "loss" | cut -d ' ' -f 4 | cut -d ',' -f 1 > /tmp/loss
cat $LOGFILE | grep "loss" | cut -d ' ' -f 7 | cut -d ',' -f 1 > /tmp/acc

python -c "
acc = [float(x[:-1]) for x in open('/tmp/acc', 'r').readlines()]
loss = [float(x[:-1]) for x in open('/tmp/loss', 'r').readlines()]
import numpy as np
acc = np.array(acc)
loss = np.array(loss)
acc = (acc - np.min(acc)) / (np.max(acc) - np.min(acc))
loss = (loss - np.min(loss)) / (np.max(loss) - np.min(loss))
import matplotlib.pyplot as plt
plt.plot(list(range(len(acc))), acc, label='accuracy')
plt.plot(list(range(len(loss))), loss, label='loss')
plt.legend()
plt.title('Scaled accuracy and loss')
plt.savefig('/tmp/a.png')
"

12. TODO PaddlePaddle 飞桨

PArallel Distributed Deep LEarning https://www.paddlepaddle.org.cn/

Created: 2024-03-03 Sun 09:52

Validate