Increase production speed and make sure the code meets quality standards before implementing it.

Picture: Jeremy Perkins on Loosen

Data set

We imagine a simple data set, but one that is so large that it is easier to test our changes in an integrated development environment (IDE) with a much smaller data size.

Sample data set with employee data

Mission

Too complex to imagine an employee data set id column shows the age of the employee, which we need to separate into a separate column called age. The age of the employee is described by the number before the line (“-”).

Solution

To do this, we need to unpack the number before the line symbol. This can be done in two ways – either by using a regular expression (regex) or by dividing the column value by a dash. Let’s take the regex in this example to pick the age:

d+(?=-)
  • + is a descriptor corresponding to one or more characters (numbers) of this type.
  • (? = -) corresponds to the group after the main expression without including it in the result after the dash symbol.
def extract_age_func(input_df: DataFrame, id_col: str):
pattern = 'd+(?=-)'
return input_df.withColumn('age', regexp_extract(col(id_col), pattern, 0))

Testing settings

Now we move on to testing – we use one of the most popular and Python standard library package unit test. Great unit test is its ease of creating reusable SparkSession. Lets write some first SparkSession initialization code that we can reuse in all tests:

class PySparkTestCase(unittest.TestCase):
"""Set-up of global test SparkSession"""@classmethod
def setUpClass(cls):
cls.spark = (SparkSession
.builder
.master("local[1]")
.appName("PySpark unit test")
.getOrCreate())

@classmethod
def tearDownClass(cls):
cls.spark.stop()

  • tearDownClass ()“Hook method for disassembling a test device after testing it.” Once the tests are complete, we will determine to quit SparkSession.
def test_schema(df1: DataFrame, df2: DataFrame, check_nullable=True):
field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
fields1 = [*map(field_list, df1.schema.fields)]
fields2 = [*map(field_list, df2.schema.fields)]
if check_nullable:
res = set(fields1) == set(fields2)
else:
res = set([field[:-1] for field in fields1]) == set([field[:-1] for field in fields2])
return res
def test_data(df1: DataFrame, df2: DataFrame):
data1 = df1.collect()
data2 = df2.collect()
return set(data1) == set(data2)
  • test_data () – also takes two DataFrames and check for information in them DataFrames match – return True if matches and Wrong if not match.

Test cases

Great, we now have auxiliary functions to review Data frame equality. Lets write tests for us Data frame conversion function extract_age_func ():

class SimpleTestCase(PySparkTestCase):

def test_dataparser_schema(self):
input_df = self.spark.createDataFrame(
data=[['Jan', 'Janson', 'jj@email.com', '20-504123'],
['Jen', 'Jenny', 'jen@email.com', '55-357378'],
['Bill', 'Bill', 'bill@email.com', '79-357378']],
schema=['first_name', 'last_name', 'email', 'id'])

transformed_df = extract_age_func(input_df, "id")

expected_df = self.spark.createDataFrame(
data=[['Jan', 'Janson', 'jj@email.com', '20-504123', '20'],
['Jen', 'Jenny', 'jen@email.com', '55-357378', '55'],
['Bill', 'Bill', 'bill@email.com', '79-357378', '79']],
schema=['first_name', 'last_name', 'email', 'id', 'age'])

self.assertTrue(test_schema(transformed_df, expected_df))

def test_dataparser_data(self):
input_df = self.spark.createDataFrame(
data=[['Jan', 'Janson', 'jj@email.com', '20-504123'],
['Jen', 'Jenny', 'jen@email.com', '55-357378'],
['Bill', 'Bill', 'bill@email.com', '79-357378']],
schema=['first_name', 'last_name', 'email', 'id'])

transformed_df = extract_age_func(input_df, "id")

expected_df = self.spark.createDataFrame(
data=[['Jan', 'Janson', 'jj@email.com', '20-504123', '20'],
['Jen', 'Jenny', 'jen@email.com', '55-357378', '55'],
['Bill', 'Bill', 'bill@email.com', '79-357378', '79']],
schema=['first_name', 'last_name', 'email', 'id', 'age'])

self.assertTrue(test_data(transformed_df, expected_df))

LEAVE A REPLY

Please enter your comment!
Please enter your name here