Project - Spam classifier

1 / 1

Project - Spam classifier

Build a model to classify email as spam or ham. First, download examples of spam and ham from Apache SpamAssassin’s public datasets and then train a model to classify email.

Objectives:

  1. Fetch the data

    import os
    import tarfile
    import urllib
    
    DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
    HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
    SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
    SPAM_PATH = os.path.join("datasets", "spam")
    
    def fetch_spam_data(spam_url=SPAM_URL, spam_path=SPAM_PATH):
        if not os.path.isdir(spam_path):
            os.makedirs(spam_path)
        for filename, url in (("ham.tar.bz2", HAM_URL), ("spam.tar.bz2", SPAM_URL)):
            path = os.path.join(spam_path, filename)
            if not os.path.isfile(path):
                urllib.request.urlretrieve(url, path)
            tar_bz2_file = tarfile.open(path)
            tar_bz2_file.extractall(path=SPAM_PATH)
            tar_bz2_file.close()
    
    fetch_spam_data()
    
  2. Now load a few emails:

    HAM_DIR = os.path.join(SPAM_PATH, "easy_ham")
    SPAM_DIR = os.path.join(SPAM_PATH, "spam")
    ham_filenames = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]
    spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]
    
    len(ham_filenames)
    
    len(spam_filenames)
    
  3. Use Python's email module to parse these emails (this handles headers, encoding, and so on):

    import email
    import email.policy
    
    def load_email(is_spam, filename, spam_path=SPAM_PATH):
        directory = "spam" if is_spam else "easy_ham"
        with open(os.path.join(spam_path, directory, filename), "rb") as f:
            return email.parser.BytesParser(policy=email.policy.default).parse(f)
    
    ham_emails = [load_email(is_spam=False, filename=name) for name in ham_filenames]
    spam_emails = [load_email(is_spam=True, filename=name) for name in spam_filenames]
    
  4. Let's look at one example of ham and one example of spam, to get a feel of what the data looks like:

    print(ham_emails[1].get_content().strip())
    
    print(spam_emails[6].get_content().strip())
    
  5. Some emails are actually multipart, with images and attachments (which can have their own attachments). Let's look at the various types of structures we have:

    def get_email_structure(email):
        if isinstance(email, str):
            return email
        payload = email.get_payload()
        if isinstance(payload, list):
            return "multipart({})".format(", ".join([
                get_email_structure(sub_email)
                for sub_email in payload
            ]))
        else:
            return email.get_content_type()
    
    from collections import Counter
    
    def structures_counter(emails):
        structures = Counter()
        for email in emails:
            structure = get_email_structure(email)
            structures[structure] += 1
        return structures
    
    structures_counter(ham_emails).most_common()
    
    structures_counter(spam_emails).most_common()
    
  6. Now let's take a look at the email headers:

    for header, value in spam_emails[0].items():
        print(header,":",value)
    
  7. You need to focus on the Subject header:

    spam_emails[0]["Subject"]
    
  8. Now split it into a training set and a test set:

    import numpy as np
    from sklearn.model_selection import train_test_split
    
    X = np.array(ham_emails + spam_emails)
    y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))
    
    X_train, X_test, y_train, y_test = <<your code goes here>>(X, y, test_size=0.2, random_state=42)
    
  9. let's start writing the preprocessing functions. First, we will need a function to convert HTML to plain text. The following function first drops the section, then converts all <a> tags to the word HYPERLINK, then it gets rid of all HTML tags, leaving only the plain text. For readability, it also replaces multiple newlines with single newlines, and finally it unescapes html entities (such as > or  ):

    import re
    from html import unescape
    
    def html_to_plain_text(html):
        text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
        text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
        text = re.sub('<.*?>', '', text, flags=re.M | re.S)
        text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
        return unescape(text)
    
    html_spam_emails = [email for email in X_train[y_train==1]
                        if get_email_structure(email) == "text/html"]
    sample_html_spam = html_spam_emails[7]
    print(sample_html_spam.get_content().strip()[:1000], "...")
    
    print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")
    
  10. Now let's write a function that takes an email as input and returns its content as plain text, whatever its format is:

    def email_to_text(email):
        html = None
        for part in email.walk():
            ctype = part.get_content_type()
            if not ctype in ("text/plain", "text/html"):
                continue
            try:
                content = part.get_content()
            except: # in case of encoding issues
                content = str(part.get_payload())
            if ctype == "text/plain":
                return content
            else:
                html = content
        if html:
            return html_to_plain_text(html)
    
    print(email_to_text(sample_html_spam)[:100], "...")
    
  11. Now install NLTK:

    !conda install nltk

    try:
        import nltk
    
        stemmer = nltk.PorterStemmer()
        for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
            print(word, "=>", stemmer.stem(word))
    except ImportError:
        print("Error: stemming requires the NLTK module.")
        stemmer = None
    
  12. You will also need a way to replace URLs with the word "URL".

    try:
        import google.colab
        !conda install -q -U urlextract
    except ImportError:
        pass
    
    try:
        import urlextract # may require an Internet connection to download root domain names
    
        url_extractor = urlextract.URLExtract()
        print(url_extractor.find_urls("Will it detect github.com and https://youtu.be/7Pq-S557XQU?t=3m32s"))
    except ImportError:
        print("Error: replacing URLs requires the urlextract module.")
        url_extractor = None
    
  13. Now put all this together into a transformer that we will use to convert emails to word counters.

    from sklearn.base import BaseEstimator, TransformerMixin
    
    class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
        def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                     replace_urls=True, replace_numbers=True, stemming=True):
            self.strip_headers = strip_headers
            self.lower_case = lower_case
            self.remove_punctuation = remove_punctuation
            self.replace_urls = replace_urls
            self.replace_numbers = replace_numbers
            self.stemming = stemming
        def fit(self, X, y=None):
            return self
        def transform(self, X, y=None):
            X_transformed = []
            for email in X:
                text = email_to_text(email) or ""
                if self.lower_case:
                    text = text.lower()
                if self.replace_urls and url_extractor is not None:
                    urls = list(set(url_extractor.find_urls(text)))
                    urls.sort(key=lambda url: len(url), reverse=True)
                    for url in urls:
                        text = text.replace(url, " URL ")
                if self.replace_numbers:
                    text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+))?', 'NUMBER', text)
                if self.remove_punctuation:
                    text = re.sub(r'\W+', ' ', text, flags=re.M)
                word_counts = Counter(text.split())
                if self.stemming and stemmer is not None:
                    stemmed_word_counts = Counter()
                    for word, count in word_counts.items():
                        stemmed_word = stemmer.stem(word)
                        stemmed_word_counts[stemmed_word] += count
                    word_counts = stemmed_word_counts
                X_transformed.append(word_counts)
            return np.array(X_transformed)
    
    
    X_few = X_train[:3]
    X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
    X_few_wordcounts
    
  14. Now we need to convert the word count to vectors:

    from scipy.sparse import csr_matrix
    
    class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
        def __init__(self, vocabulary_size=1000):
            self.vocabulary_size = vocabulary_size
        def fit(self, X, y=None):
            total_count = Counter()
            for word_count in X:
                for word, count in word_count.items():
                    total_count[word] += min(count, 10)
            most_common = total_count.most_common()[:self.vocabulary_size]
            self.most_common_ = most_common
            self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}
            return self
        def transform(self, X, y=None):
            rows = []
            cols = []
            data = []
            for row, word_count in enumerate(X):
                for word, count in word_count.items():
                    rows.append(row)
                    cols.append(self.vocabulary_.get(word, 0))
                    data.append(count)
            return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size + 1))
    
    vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
    X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
    X_few_vectors
    
    X_few_vectors.toarray()
    
    vocab_transformer.vocabulary_
    
  15. Now let's transform the whole dataset:

    from sklearn.pipeline import Pipeline
    
    preprocess_pipeline = Pipeline([
        ("email_to_wordcount", EmailToWordCounterTransformer()),
        ("wordcount_to_vector", WordCounterToVectorTransformer()),
    ])
    
    X_train_transformed = preprocess_pipeline.fit_transform(X_train)
    
    from sklearn.linear_model import LogisticRegression
    from sklearn.model_selection import cross_val_score
    
  16. Now use logistic regression on the transformed data.


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...