Published on

Advanced Machine Learning: Deep Learning, NLP, Computer Vision, and MLOps

Table of Contents

Introduction

Welcome to advanced machine learning—where you'll move beyond traditional algorithms into deep learning, natural language processing, computer vision, and production-scale ML systems. This guide covers cutting-edge techniques used by leading AI researchers and engineers at companies like Google, OpenAI, and Meta.

You'll learn to build neural networks from scratch, fine-tune transformers for NLP tasks, create computer vision systems, and deploy ML models at scale with MLOps best practices.

Prerequisites: This is an advanced guide. You should be comfortable with:

  • Intermediate ML concepts (ensemble methods, feature engineering)
  • Python, NumPy, pandas, and scikit-learn
  • Linear algebra and calculus basics
  • Model training and evaluation

Need to catch up? Start with our Beginner's Guide or Intermediate Guide.

Deep Learning Fundamentals

Neural Networks from Scratch

Understanding the math behind neural networks is crucial for building custom architectures.

import numpy as np
import matplotlib.pyplot as plt

class NeuralNetwork:
    def __init__(self, layers):
        """
        layers: list of layer sizes [input_size, hidden1, hidden2, ..., output_size]
        """
        self.layers = layers
        self.num_layers = len(layers)
        self.weights = []
        self.biases = []

        # Initialize weights using He initialization
        for i in range(len(layers) - 1):
            w = np.random.randn(layers[i], layers[i+1]) * np.sqrt(2.0 / layers[i])
            b = np.zeros((1, layers[i+1]))
            self.weights.append(w)
            self.biases.append(b)

    def relu(self, x):
        return np.maximum(0, x)

    def relu_derivative(self, x):
        return (x > 0).astype(float)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -500, 500)))

    def sigmoid_derivative(self, x):
        s = self.sigmoid(x)
        return s * (1 - s)

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)

    def forward_propagation(self, X):
        """Forward pass through network"""
        self.activations = [X]
        self.z_values = []

        for i in range(self.num_layers - 1):
            z = np.dot(self.activations[-1], self.weights[i]) + self.biases[i]
            self.z_values.append(z)

            # Use ReLU for hidden layers, softmax for output
            if i < self.num_layers - 2:
                a = self.relu(z)
            else:
                a = self.softmax(z)

            self.activations.append(a)

        return self.activations[-1]

    def compute_loss(self, y_true, y_pred):
        """Cross-entropy loss"""
        m = y_true.shape[0]
        loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
        return loss

    def backward_propagation(self, y_true):
        """Backward pass - compute gradients"""
        m = y_true.shape[0]
        self.gradients_w = []
        self.gradients_b = []

        # Output layer gradient
        delta = self.activations[-1] - y_true

        for i in range(self.num_layers - 2, -1, -1):
            # Compute gradients
            grad_w = np.dot(self.activations[i].T, delta) / m
            grad_b = np.sum(delta, axis=0, keepdims=True) / m

            self.gradients_w.insert(0, grad_w)
            self.gradients_b.insert(0, grad_b)

            if i > 0:
                # Propagate error to previous layer
                delta = np.dot(delta, self.weights[i].T) * self.relu_derivative(self.z_values[i-1])

    def update_parameters(self, learning_rate):
        """Update weights using gradient descent"""
        for i in range(len(self.weights)):
            self.weights[i] -= learning_rate * self.gradients_w[i]
            self.biases[i] -= learning_rate * self.gradients_b[i]

    def train(self, X, y, epochs, learning_rate, batch_size=32, verbose=True):
        """Training loop with mini-batch gradient descent"""
        history = {'loss': [], 'accuracy': []}
        m = X.shape[0]

        for epoch in range(epochs):
            # Shuffle data
            indices = np.random.permutation(m)
            X_shuffled = X[indices]
            y_shuffled = y[indices]

            epoch_loss = 0
            batches = m // batch_size

            for i in range(batches):
                start = i * batch_size
                end = start + batch_size

                X_batch = X_shuffled[start:end]
                y_batch = y_shuffled[start:end]

                # Forward pass
                y_pred = self.forward_propagation(X_batch)

                # Compute loss
                batch_loss = self.compute_loss(y_batch, y_pred)
                epoch_loss += batch_loss

                # Backward pass
                self.backward_propagation(y_batch)

                # Update parameters
                self.update_parameters(learning_rate)

            # Evaluate on full dataset
            y_pred_full = self.forward_propagation(X)
            predictions = np.argmax(y_pred_full, axis=1)
            true_labels = np.argmax(y, axis=1)
            accuracy = np.mean(predictions == true_labels)

            avg_loss = epoch_loss / batches
            history['loss'].append(avg_loss)
            history['accuracy'].append(accuracy)

            if verbose and (epoch % 10 == 0 or epoch == epochs - 1):
                print(f"Epoch {epoch}/{epochs} - Loss: {avg_loss:.4f} - Accuracy: {accuracy:.4f}")

        return history

# Example usage
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Load data
digits = load_digits()
X, y = digits.data, digits.target

# Preprocess
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# One-hot encode labels
y_onehot = np.eye(10)[y]

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_onehot, test_size=0.2, random_state=42
)

# Create and train network
nn = NeuralNetwork([64, 128, 64, 10])
history = nn.train(X_train, y_train, epochs=100, learning_rate=0.01, batch_size=32)

# Evaluate
y_pred = nn.forward_propagation(X_test)
predictions = np.argmax(y_pred, axis=1)
true_labels = np.argmax(y_test, axis=1)
test_accuracy = np.mean(predictions == true_labels)
print(f"\nTest Accuracy: {test_accuracy:.4f}")

# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history['loss'])
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.subplot(1, 2, 2)
plt.plot(history['accuracy'])
plt.title('Training Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.tight_layout()
plt.show()

Deep Learning with PyTorch

PyTorch is the preferred framework for research and production ML.

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Custom Dataset
class CustomDataset(Dataset):
    def __init__(self, X, y, transform=None):
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)
        self.transform = transform

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        sample = self.X[idx]
        label = self.y[idx]

        if self.transform:
            sample = self.transform(sample)

        return sample, label

# Build Neural Network
class DeepNN(nn.Module):
    def __init__(self, input_size, hidden_sizes, num_classes, dropout_rate=0.5):
        super(DeepNN, self).__init__()

        layers = []
        prev_size = input_size

        # Hidden layers
        for hidden_size in hidden_sizes:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.BatchNorm1d(hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            prev_size = hidden_size

        # Output layer
        layers.append(nn.Linear(prev_size, num_classes))

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Statistics
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

        train_loss = train_loss / len(train_loader)
        train_acc = train_correct / train_total

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_acc = val_correct / val_total

        # Save history
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f'Epoch [{epoch+1}/{num_epochs}] '
              f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | '
              f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

    return history

# Initialize model
model = DeepNN(
    input_size=64,
    hidden_sizes=[256, 128, 64],
    num_classes=10,
    dropout_rate=0.3
).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5, verbose=True
)

# Create data loaders
train_dataset = CustomDataset(X_train, y_train.argmax(axis=1))
val_dataset = CustomDataset(X_test, y_test.argmax(axis=1))

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Train model
history = train_model(
    model, train_loader, val_loader,
    criterion, optimizer, num_epochs=50, device=device
)

# Save model
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'history': history
}, 'model_checkpoint.pth')

Convolutional Neural Networks (CNNs)

CNNs are the foundation of computer vision.

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()

        # Convolutional layers
        self.conv_layers = nn.Sequential(
            # Conv Block 1
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),

            # Conv Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),

            # Conv Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),
        )

        # Fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Linear(256 * 4 * 4, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc_layers(x)
        return x

# Data augmentation
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-10
trainset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_train
)
testset = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform_test
)

train_loader = DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2)
test_loader = DataLoader(testset, batch_size=128, shuffle=False, num_workers=2)

# Train CNN
cnn_model = ConvNet(num_classes=10).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(cnn_model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

history_cnn = train_model(
    cnn_model, train_loader, test_loader,
    criterion, optimizer, num_epochs=100, device=device
)

Recurrent Neural Networks (RNNs) and LSTMs

For sequential data like time series and text.

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
        super(LSTMModel, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM layer
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers,
            batch_first=True, dropout=dropout, bidirectional=True
        )

        # Fully connected layer
        self.fc = nn.Linear(hidden_size * 2, num_classes)  # *2 for bidirectional

    def forward(self, x):
        # Initialize hidden state and cell state
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)

        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))

        # Get output from last time step
        out = self.fc(out[:, -1, :])
        return out

# GRU alternative (often faster than LSTM)
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
        super(GRUModel, self).__init__()

        self.gru = nn.GRU(
            input_size, hidden_size, num_layers,
            batch_first=True, dropout=dropout, bidirectional=True
        )
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h0)
        out = self.fc(out[:, -1, :])
        return out

Natural Language Processing (NLP)

Transformers and Attention Mechanism

Transformers revolutionized NLP with the attention mechanism.

from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    Trainer, TrainingArguments, AutoModel
)
from datasets import load_dataset
import torch.nn.functional as F

# Load pre-trained model and tokenizer
model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
    model_name, num_labels=2
)

# Load dataset
dataset = load_dataset("imdb")

# Tokenization function
def tokenize_function(examples):
    return tokenizer(
        examples["text"],
        padding="max_length",
        truncation=True,
        max_length=512
    )

# Tokenize dataset
tokenized_datasets = dataset.map(tokenize_function, batched=True)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=100,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="accuracy"
)

# Metrics
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average='binary'
    )
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    compute_metrics=compute_metrics
)

# Train
trainer.train()

# Evaluate
results = trainer.evaluate()
print(results)

# Save model
model.save_pretrained("./fine_tuned_bert")
tokenizer.save_pretrained("./fine_tuned_bert")

Advanced NLP: Custom Transformer Architecture

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attention = F.softmax(scores, dim=-1)
        output = torch.matmul(attention, V)

        return output, attention

    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        attn_output, attention = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))

        return output, attention

class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerBlock, self).__init__()

        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Multi-head attention
        attn_output, _ = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))

        # Feed-forward network
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))

        return x

class CustomTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, num_classes, dropout=0.1):
        super(CustomTransformer, self).__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = self.create_positional_encoding(max_seq_length, d_model)

        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

        self.fc = nn.Linear(d_model, num_classes)
        self.dropout = nn.Dropout(dropout)

    def create_positional_encoding(self, max_seq_length, d_model):
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        return pe.unsqueeze(0)

    def forward(self, x, mask=None):
        # Embedding and positional encoding
        x = self.embedding(x)
        seq_length = x.size(1)
        x = x + self.positional_encoding[:, :seq_length, :].to(x.device)
        x = self.dropout(x)

        # Transformer blocks
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, mask)

        # Classification
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)

        return x

# Example usage
transformer_model = CustomTransformer(
    vocab_size=10000,
    d_model=512,
    num_heads=8,
    num_layers=6,
    d_ff=2048,
    max_seq_length=512,
    num_classes=2,
    dropout=0.1
).to(device)

Text Generation with GPT-style Models

from transformers import GPT2LMHeadModel, GPT2Tokenizer, TextDataset, DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments

# Load pre-trained GPT-2
model_name = "gpt2"
model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

# Fine-tune on custom text
def load_dataset_for_lm(file_path, tokenizer, block_size=128):
    dataset = TextDataset(
        tokenizer=tokenizer,
        file_path=file_path,
        block_size=block_size
    )
    return dataset

# Training arguments
training_args = TrainingArguments(
    output_dir="./gpt2-finetuned",
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=4,
    save_steps=10_000,
    save_total_limit=2,
    prediction_loss_only=True,
)

# Data collator
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=False
)

# Train
train_dataset = load_dataset_for_lm("train.txt", tokenizer)
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
)
trainer.train()

# Generate text
def generate_text(prompt, model, tokenizer, max_length=100, temperature=0.7, top_k=50, top_p=0.95):
    model.eval()
    input_ids = tokenizer.encode(prompt, return_tensors='pt').to(device)

    with torch.no_grad():
        output = model.generate(
            input_ids,
            max_length=max_length,
            temperature=temperature,
            top_k=top_k,
            top_p=top_p,
            num_return_sequences=1,
            pad_token_id=tokenizer.eos_token_id
        )

    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
    return generated_text

# Example
prompt = "The future of artificial intelligence is"
generated = generate_text(prompt, model, tokenizer)
print(generated)

Computer Vision Advanced Techniques

Transfer Learning with Pre-trained Models

import torchvision.models as models
from torch import nn

# Load pre-trained ResNet50
resnet = models.resnet50(pretrained=True)

# Freeze early layers
for param in resnet.parameters():
    param.requires_grad = False

# Replace final layer
num_features = resnet.fc.in_features
resnet.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(num_features, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, num_classes)
)

# Only train the new layers
resnet = resnet.to(device)
optimizer = optim.Adam(resnet.fc.parameters(), lr=0.001)

# Fine-tune entire network after initial training
def unfreeze_model(model):
    for param in model.parameters():
        param.requires_grad = True

# After 10 epochs, unfreeze and train with lower learning rate
unfreeze_model(resnet)
optimizer = optim.Adam(resnet.parameters(), lr=0.0001)

Object Detection with YOLO/Faster R-CNN

import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# Load pre-trained Faster R-CNN
model = fasterrcnn_resnet50_fpn(pretrained=True)

# Replace the classifier
num_classes = 91  # COCO has 91 classes
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

model = model.to(device)

# Inference
model.eval()
with torch.no_grad():
    predictions = model(images)

# Parse predictions
for pred in predictions:
    boxes = pred['boxes']
    labels = pred['labels']
    scores = pred['scores']

    # Filter by confidence
    keep = scores > 0.5
    boxes = boxes[keep]
    labels = labels[keep]
    scores = scores[keep]

Image Segmentation

from torchvision.models.segmentation import deeplabv3_resnet50

# Load DeepLabV3
segmentation_model = deeplabv3_resnet50(pretrained=True)
segmentation_model = segmentation_model.to(device)
segmentation_model.eval()

# Predict
with torch.no_grad():
    output = segmentation_model(input_batch)['out'][0]
    output_predictions = output.argmax(0)

# Visualize segmentation
import matplotlib.pyplot as plt
from torchvision import transforms

def decode_segmap(image, nc=21):
    label_colors = np.array([(0, 0, 0),  # Background
                             (128, 0, 0), (0, 128, 0), ...])  # Add more colors

    r = np.zeros_like(image).astype(np.uint8)
    g = np.zeros_like(image).astype(np.uint8)
    b = np.zeros_like(image).astype(np.uint8)

    for l in range(0, nc):
        idx = image == l
        r[idx] = label_colors[l, 0]
        g[idx] = label_colors[l, 1]
        b[idx] = label_colors[l, 2]

    rgb = np.stack([r, g, b], axis=2)
    return rgb

segmented_image = decode_segmap(output_predictions.cpu().numpy())
plt.imshow(segmented_image)
plt.show()

MLOps: Production Machine Learning

Experiment Tracking with MLflow

import mlflow
import mlflow.pytorch

# Start MLflow run
with mlflow.start_run():
    # Log parameters
    mlflow.log_param("learning_rate", 0.001)
    mlflow.log_param("batch_size", 64)
    mlflow.log_param("num_epochs", 100)
    mlflow.log_param("model_architecture", "ResNet50")

    # Train model
    for epoch in range(num_epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = validate(model, val_loader, criterion, device)

        # Log metrics
        mlflow.log_metric("train_loss", train_loss, step=epoch)
        mlflow.log_metric("train_acc", train_acc, step=epoch)
        mlflow.log_metric("val_loss", val_loss, step=epoch)
        mlflow.log_metric("val_acc", val_acc, step=epoch)

    # Log model
    mlflow.pytorch.log_model(model, "model")

    # Log artifacts
    mlflow.log_artifact("config.yaml")
    mlflow.log_artifact("training_plot.png")

# Query runs
runs = mlflow.search_runs()
best_run = runs.sort_values("metrics.val_acc", ascending=False).iloc[0]
print(f"Best run ID: {best_run['run_id']}")

Model Versioning and Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model
model_uri = f"runs:/{run_id}/model"
model_details = mlflow.register_model(model_uri, "fraud_detection_model")

# Transition to production
client.transition_model_version_stage(
    name="fraud_detection_model",
    version=model_details.version,
    stage="Production"
)

# Load production model
production_model = mlflow.pyfunc.load_model(
    model_uri=f"models:/fraud_detection_model/Production"
)

# Make predictions
predictions = production_model.predict(new_data)

Model Serving with FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
import torch

app = FastAPI()

# Load model
model = torch.load('model.pth')
model.eval()

class PredictionRequest(BaseModel):
    features: list

class PredictionResponse(BaseModel):
    prediction: int
    probability: float
    model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    try:
        # Preprocess
        features = torch.FloatTensor(request.features).unsqueeze(0).to(device)

        # Predict
        with torch.no_grad():
            output = model(features)
            probability = torch.softmax(output, dim=1)
            prediction = torch.argmax(probability, dim=1).item()

        return PredictionResponse(
            prediction=prediction,
            probability=float(probability[0][prediction]),
            model_version="1.0.0"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": model is not None}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Containerization with Docker

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Expose port
EXPOSE 8000

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
  ml-api:
    build: .
    ports:
      - '8000:8000'
    environment:
      - MODEL_PATH=/models/model.pth
    volumes:
      - ./models:/models
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 4G

CI/CD Pipeline for ML

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2

      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: 3.9

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest

      - name: Run tests
        run: pytest tests/

  train:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2

      - name: Train model
        run: python train.py

      - name: Evaluate model
        run: python evaluate.py

      - name: Upload model artifact
        uses: actions/upload-artifact@v2
        with:
          name: model
          path: models/

  deploy:
    needs: train
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Deploy to production
        run: |
          # Deploy model to production environment
          echo "Deploying model..."

Model Monitoring in Production

import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
prediction_counter = Counter('predictions_total', 'Total predictions')
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
model_accuracy = Gauge('model_accuracy', 'Current model accuracy')
data_drift_score = Gauge('data_drift_score', 'Data drift score')

@app.middleware("http")
async def monitor_requests(request, call_next):
    start_time = time.time()

    response = await call_next(request)

    # Record metrics
    duration = time.time() - start_time
    prediction_latency.observe(duration)
    prediction_counter.inc()

    return response

@app.get("/metrics")
async def metrics():
    return prometheus_client.generate_latest()

# Batch monitoring
class ModelMonitor:
    def __init__(self, reference_data, window_size=1000):
        self.reference_data = reference_data
        self.predictions = []
        self.actuals = []
        self.window_size = window_size

    def log_prediction(self, features, prediction, actual=None):
        self.predictions.append({
            'timestamp': time.time(),
            'features': features,
            'prediction': prediction,
            'actual': actual
        })

        if len(self.predictions) > self.window_size:
            self.predictions.pop(0)

    def check_drift(self):
        from scipy.stats import ks_2samp

        recent_features = [p['features'] for p in self.predictions[-100:]]
        drift_scores = []

        for i in range(len(recent_features[0])):
            recent_values = [f[i] for f in recent_features]
            reference_values = self.reference_data[:, i]

            statistic, p_value = ks_2samp(reference_values, recent_values)
            drift_scores.append(p_value)

        avg_drift = np.mean(drift_scores)
        data_drift_score.set(avg_drift)

        return avg_drift < 0.05  # Drift detected if p-value < 0.05

    def calculate_performance(self):
        recent_preds = [p for p in self.predictions if p['actual'] is not None]

        if len(recent_preds) > 100:
            actuals = [p['actual'] for p in recent_preds[-100:]]
            preds = [p['prediction'] for p in recent_preds[-100:]]

            accuracy = np.mean(np.array(actuals) == np.array(preds))
            model_accuracy.set(accuracy)

            return accuracy

monitor = ModelMonitor(X_train)

Advanced Topics

Few-Shot Learning

from transformers import GPT2LMHeadModel, GPT2Tokenizer

def few_shot_classification(examples, query, model, tokenizer):
    """
    Perform few-shot classification using prompt engineering
    """
    # Create prompt with examples
    prompt = "Classify the following text as positive or negative:\n\n"

    for example, label in examples:
        prompt += f"Text: {example}\nSentiment: {label}\n\n"

    prompt += f"Text: {query}\nSentiment:"

    # Generate
    input_ids = tokenizer.encode(prompt, return_tensors='pt')
    output = model.generate(input_ids, max_length=len(input_ids[0]) + 10)
    prediction = tokenizer.decode(output[0], skip_special_tokens=True)

    return prediction.split("Sentiment:")[-1].strip()

# Example
examples = [
    ("This movie was amazing!", "positive"),
    ("Terrible waste of time", "negative"),
    ("Loved every minute of it", "positive")
]

query = "The film was incredibly boring"
result = few_shot_classification(examples, query, model, tokenizer)

Neural Architecture Search (NAS)

from ray import tune
from ray.tune.schedulers import ASHAScheduler

def train_model_with_config(config):
    """Train model with given hyperparameter configuration"""
    model = create_model(
        num_layers=config["num_layers"],
        hidden_size=config["hidden_size"],
        dropout=config["dropout"]
    )

    # Train
    for epoch in range(config["num_epochs"]):
        train_loss = train_epoch(model, train_loader, optimizer, criterion)
        val_loss = validate(model, val_loader, criterion)

        # Report to Ray Tune
        tune.report(loss=val_loss, accuracy=val_acc)

# Define search space
config = {
    "num_layers": tune.choice([2, 3, 4, 5]),
    "hidden_size": tune.choice([64, 128, 256, 512]),
    "dropout": tune.uniform(0.1, 0.5),
    "learning_rate": tune.loguniform(1e-4, 1e-1),
    "batch_size": tune.choice([16, 32, 64, 128]),
    "num_epochs": 50
}

# Run hyperparameter search
analysis = tune.run(
    train_model_with_config,
    config=config,
    num_samples=100,
    scheduler=ASHAScheduler(metric="loss", mode="min"),
    resources_per_trial={"cpu": 4, "gpu": 1}
)

best_config = analysis.get_best_config(metric="loss", mode="min")
print(f"Best config: {best_config}")

Federated Learning

import torch
import copy

class FederatedLearning:
    def __init__(self, global_model, client_data_loaders, num_rounds=10):
        self.global_model = global_model
        self.client_data_loaders = client_data_loaders
        self.num_rounds = num_rounds

    def train_client(self, client_id, client_loader, num_epochs=1):
        """Train on single client's data"""
        model = copy.deepcopy(self.global_model)
        optimizer = optim.SGD(model.parameters(), lr=0.01)
        criterion = nn.CrossEntropyLoss()

        model.train()
        for epoch in range(num_epochs):
            for inputs, labels in client_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

        return model.state_dict()

    def aggregate_models(self, client_models):
        """FedAvg: Average model parameters"""
        global_dict = self.global_model.state_dict()

        for key in global_dict.keys():
            global_dict[key] = torch.stack([
                client_models[i][key].float() for i in range(len(client_models))
            ], 0).mean(0)

        self.global_model.load_state_dict(global_dict)

    def train(self):
        """Federated training loop"""
        for round_num in range(self.num_rounds):
            print(f"Round {round_num + 1}/{self.num_rounds}")

            # Train on all clients
            client_models = []
            for client_id, client_loader in enumerate(self.client_data_loaders):
                client_model = self.train_client(client_id, client_loader)
                client_models.append(client_model)

            # Aggregate
            self.aggregate_models(client_models)

            # Evaluate global model
            accuracy = self.evaluate_global_model()
            print(f"Global model accuracy: {accuracy:.4f}")

# Example usage
federated_learning = FederatedLearning(
    global_model=model,
    client_data_loaders=[client1_loader, client2_loader, client3_loader],
    num_rounds=20
)
federated_learning.train()

Best Practices for Advanced ML

1. Reproducibility

import random
import numpy as np
import torch

def set_seed(seed=42):
    """Set seeds for reproducibility"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

2. Mixed Precision Training

from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        optimizer.zero_grad()

        # Forward pass with autocasting
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # Backward pass with gradient scaling
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

3. Gradient Accumulation

accumulation_steps = 4

for i, (inputs, labels) in enumerate(train_loader):
    outputs = model(inputs)
    loss = criterion(outputs, labels)
    loss = loss / accumulation_steps  # Normalize loss

    loss.backward()

    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

Resources for Continued Learning

You've now mastered advanced machine learning! Continue your journey:

Research Papers:

Courses:

Communities:

Conclusion

You've completed the advanced ML journey! You can now:

  • Build custom neural network architectures
  • Fine-tune state-of-the-art transformers
  • Create computer vision systems
  • Deploy production ML systems with MLOps
  • Contribute to cutting-edge AI research

The field of AI/ML evolves rapidly. Stay curious, keep experimenting, and never stop learning!

Start from the beginning? Check out our:

Related Articles