Test Data Based on Real Data in PySpark

Making rules to generate mocked data can be cumbersome. Especially if the project is running and you built it directly in a production environment out of necessity or a PoC that evolved, but now realise it needs to be formalised and properly debugged while keeping production stable.

To avoid having to manually type in a lot of rules for creating mock data - or even worse typing in thousands of mock data itself - you can take advantage of already having a dataset, analyse it and define your test data around this.

Let's first create a sample dataset for this case. In the real world scenario this is where you want to reference an actual dataset to shape the rules of the test data setup.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, BooleanType, ArrayType
from datetime import datetime, timedelta
from faker import Faker
import random

# Create a Faker instance
fake = Faker()

def generate_random_dataframe(schema, num_rows):
    """
    Generate a PySpark DataFrame with a specified schema and a specified number of rows with random content.

    Parameters:
    - schema: PySpark StructType schema
    - num_rows: Number of rows to generate

    Returns:
    - PySpark DataFrame with randomly generated content based on the given schema
    """
    # Create a Spark session
    spark = SparkSession.builder.appName("RandomDataFrameGenerator").getOrCreate()

    # Extract column names and types from the schema
    column_names = [field.name for field in schema.fields]
    column_types = [field.dataType for field in schema.fields]

    # Initialize an empty list to store rows
    data = []

    # Generate random data for the specified number of rows
    for i in range(1, num_rows + 1):
        # Generate a random value for each column based on its type
        row = []
        for col_type in column_types:
            if col_type == IntegerType():
                row.append(random.randint(1, 100))
            elif col_type == StringType():
                row.append(fake.name())
            elif col_type == DateType():
                start_date = datetime(1970, 1, 1)
                random_days = random.randint(1, 365 * 50)  # Random date within the last 50 years
                row.append(start_date + timedelta(days=random_days))
            elif col_type == BooleanType():
                row.append(random.choice([True, False]))
            elif col_type == ArrayType(IntegerType()):
                row.append([random.randint(70, 100) for _ in range(random.randint(1, 5))])

        # Append the generated row to the data list
        data.append(row)

    # Create a PySpark DataFrame
    generated_dataframe = spark.createDataFrame(data, schema)

    return generated_dataframe

custom_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("dob", DateType(), True),
    StructField("is_student", BooleanType(), True),
    StructField("grades", ArrayType(IntegerType()), True)
])

num_rows_to_generate = 1000
df = generate_random_dataframe(custom_schema, num_rows_to_generate)

In this case I have demonstrated a simple logic around 5 datatypes:

  • Numeric types:
    • Integer
    • Long
    • Double
  • Strings
  • Dates
  • Arrays

We run the dataset through get_data_types:

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, LongType, DoubleType, DateType, ArrayType, StringType
from pprint import pprint

def get_rules(field, df):
    numeric_types = (IntegerType, LongType, DoubleType)

    if isinstance(field.dataType, numeric_types):
        return [df.agg(F.min(field.name)).first()[0], df.agg(F.max(field.name)).first()[0]]
    elif isinstance(field.dataType, StringType):
        return df.select(field.name).distinct().rdd.flatMap(lambda x: x).collect()
    elif isinstance(field.dataType, DateType):
        return [df.agg(F.min(field.name)).first()[0].strftime("%Y-%m-%d"), df.agg(F.max(field.name)).first()[0].strftime("%Y-%m-%d")]
    elif isinstance(field.dataType, ArrayType):
        # Assuming arrays of either strings or integers
        return df.select(F.explode(field.name).alias("exploded")).distinct().rdd.flatMap(lambda x: x).collect()
    else:
        # Handle other cases or return a default value
        return None  # You can adjust this based on your requirements

def get_data_types(df):
    """
    Get the data types of columns in a PySpark DataFrame.

    Parameters:
    - df: PySpark DataFrame

    Returns:
    - List of dictionaries containing column name and data type information.
    """
    columns = []

    for field in df.schema.fields:
        columns.append({'column': field.name, 'type': field.dataType, 'rules': get_rules(field, df)})

    return columns

pprint(get_data_types(df))

This will output a JSON of columns, their data types and some rules that should be applied to the test data. The rules are based on the referenced dataframe, such as numberics have a min and max and strings have a list of possibilities.

[
{'column': 'id', 'rules': [12, 100], 'type': IntegerType()},
 {'column': 'name',
  'rules': ['Dustin Townsend',
            'Logan Wright',
            'Lori Miller',
            'Jeffery Mckee',
            'Kimberly Campbell'],
  'type': StringType()},
 {'column': 'age', 'rules': [5, 89], 'type': IntegerType()},
 {'column': 'dob', 'rules': ['1985-03-12', '1999-08-11'], 'type': DateType()},
 {'column': 'is_student', 'rules': None, 'type': BooleanType()},
 {'column': 'grades',
  'rules': [87, 73, 70, 71, 89, 94, 84, 80, 76, 97, 82, 95, 79, 74],
  'type': ArrayType(IntegerType(), True)}
]

Finally this JSON of rules can be run in a test environment through the following method, which will built a new dataframe with "num_rows" of rows based on the rules input.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, BooleanType, ArrayType
from datetime import datetime, timedelta
import random

# Create a Spark session
spark = SparkSession.builder.appName("RandomDataGenerator").getOrCreate()

def generate_random_data(schema_rules, num_rows):
    """
    Generate random data based on the specified schema rules.

    Parameters:
    - schema_rules: List of dictionaries specifying column rules
    - num_rows: Number of rows to generate

    Returns:
    - PySpark DataFrame with randomly generated data
    """
    fields = []
    data = []

    for rule in schema_rules:
        column_name = rule['column']
        data_type = rule['type']

        if data_type == IntegerType():
            min_value, max_value = rule['rules']
            data.append([random.randint(min_value, max_value) for _ in range(num_rows)])
        elif data_type == StringType():
            choices = rule['rules']
            data.append([random.choice(choices) for _ in range(num_rows)])
        elif data_type == DateType():
            start_date, end_date = rule['rules']
            start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
            end_datetime = datetime.strptime(end_date, "%Y-%m-%d")
            date_range = end_datetime - start_datetime
            data.append([start_datetime + timedelta(days=random.randint(0, date_range.days)) for _ in range(num_rows)])
        elif data_type == BooleanType():
            data.append([random.choice([True, False]) for _ in range(num_rows)])
        elif data_type == ArrayType(IntegerType()):
            values = rule['rules']
            data.append([[random.choice(values) for _ in range(random.randint(1, 5))] for _ in range(num_rows)])

        fields.append(StructField(column_name, data_type, True))

    schema = StructType(fields)
    data_transposed = list(map(list, zip(*data)))
    rdd = spark.sparkContext.parallelize(data_transposed)
    df = spark.createDataFrame(rdd, schema=schema)

    return df

# Example usage
generated_data = generate_random_data(get_data_types(df), num_rows=50)
generated_data.show(50)

In this case my sample data output looked like this.

+---+--------------------+---+----------+----------+--------------------+
| id|                name|age|       dob|is_student|              grades|
+---+--------------------+---+----------+----------+--------------------+
| 63|       Melanie Wells|100|1973-10-20|     false|                [78]|
| 61|          Lori Sharp| 16|1999-03-27|     false|        [79, 77, 88]|
| 24| Miss Michaela Jones| 12|1987-09-04|      true|                [82]|
|  5|        Nina Schultz| 45|1973-02-03|     false|        [95, 99, 94]|
| 83|       Douglas Olson| 84|2002-10-19|     false|[78, 86, 72, 91, 87]|
| 19|        Michael Hill| 24|1998-02-12|      true|                [97]|
| 19|      Stefanie Casey| 47|2003-08-13|     false|[93, 83, 81, 74, 92]|
| 26|     Stephanie Marks| 16|1994-07-16|     false|    [99, 88, 85, 70]|
| 96|       Andrea Kaiser| 20|2009-04-30|      true|[96, 74, 87, 73, 95]|
| 75|      Robert Mcgrath| 30|1988-03-13|      true|                [90]|
| 49|          Cory Kelly| 11|2004-12-05|      true|        [78, 85, 75]|
| 73|        Sharon Adams| 13|2003-05-05|     false|        [73, 89, 89]|
| 45|       Danielle Love| 33|2012-02-18|      true|        [96, 90, 77]|
| 60|       Daniel Meyers| 82|2009-10-15|     false|    [89, 87, 96, 98]|
| 85|        Lori Barrett| 72|1998-07-09|     false|            [76, 87]|
| 41|           Amy Mccoy| 23|2016-10-30|     false|            [80, 77]|
| 86|         Paula Gross| 44|1999-05-16|      true|[95, 71, 71, 92, 98]|
| 71|    Wanda Mccullough| 57|1995-04-22|     false|                [91]|
| 69|Jacqueline Hernandez| 71|2006-09-30|     false|            [99, 91]|
| 41|    Brandon Mitchell| 37|1982-03-20|     false|                [72]|
| 72|         Deborah Lee| 53|2000-10-05|     false|        [73, 72, 87]|
| 84|      George Harrell| 62|2002-03-06|      true|   [73, 99, 100, 78]|
| 30|        Lori Barrett| 19|2005-06-13|     false|                [89]|
| 90|         Connie Love| 50|1996-03-02|     false|    [88, 88, 85, 74]|
| 25|       Douglas Olson| 66|1986-12-06|      true|    [72, 90, 83, 70]|
| 30|      Adam Rodriguez|  5|1999-05-17|     false|[89, 98, 84, 85, 86]|
| 77|        Wesley Smith| 33|1997-03-16|      true|            [98, 77]|
| 71|     Jonathan Jacobs| 35|1970-10-25|      true|                [80]|
| 12|        Robert Perry| 93|2018-04-29|      true|    [73, 71, 89, 70]|
| 99|     Michael Proctor| 17|1994-08-02|      true|                [92]|
| 82|      Michele Guerra| 34|1977-12-23|     false|                [86]|
| 87|       Sarah Jackson| 97|1978-06-28|     false|        [76, 75, 97]|
| 87|     Zachary Cordova|  1|1995-12-22|     false|        [85, 97, 94]|
| 47|       Dustin Carney| 36|1987-12-11|      true|               [100]|
| 83|Jonathan Rodrigue...| 73|1975-06-17|     false|    [92, 90, 83, 71]|
| 51|      Loretta Garcia| 40|2008-12-12|     false|[79, 76, 85, 87, 99]|
| 36|      George Harrell| 40|1995-07-11|     false|            [90, 76]|
| 38|      Katherine Odom| 76|2012-02-26|      true|    [77, 87, 85, 74]|
| 87|    Jeffrey Franklin| 47|1990-03-26|      true|            [73, 82]|
| 50|       Anita Roberts| 76|1994-06-26|      true|        [73, 84, 84]|
| 24|     Charles Johnson| 51|1980-03-12|     false|[85, 91, 94, 90, 90]|
|  2|      Carolyn Rivera| 28|1973-11-15|     false|        [76, 95, 89]|
| 16|   Julie Thompson MD| 38|1998-06-27|      true|        [97, 91, 72]|
| 33|       Jordan Warner|  5|2009-07-24|     false|        [71, 71, 94]|
| 60|        Troy Barrett|  1|2006-12-27|      true|   [80, 71, 76, 100]|
| 21|  Michele Pennington| 94|1994-04-11|      true|                [99]|
| 90|      Jesse Mcdonald| 13|1985-05-29|      true|[90, 90, 72, 80, 71]|
| 60|    Brandon Reynolds| 65|2009-07-12|      true|[97, 84, 79, 88, 85]|
| 35|         Brian Hines| 64|2010-08-02|     false|                [99]|
| 56|       Pamela Rivera| 88|2011-12-05|     false|[84, 74, 83, 87, 88]|
+---+--------------------+---+----------+----------+--------------------+
Emil Moe

Software- and Data Engineer

I created this website to help you empower your infrastructure and so you don't need to spend the same amount of hours as me on researching. I chose to make the site ad-free, so if you like what I do, please consider supporting my Patreon.

Leave a Reply

Your email address will not be published. Required fields are marked *