使用NumPy、Numba和Python异步编程的高性能大数据分析与对比

编辑:光环大数据 来源: 互联网 时间: 2017-10-20 10:15 阅读:

  使用NumPy、Numba和Python异步编程的高性能大数据分析与对比。几个月前,一位客户问我:“目前大数据分析中较快的Python数据结构对象是什么?”我总被问到类似的问题。其中有一些问题很难解决,通常需要多花一些时间才能找到合适的优化解决方案。我一般会在周末和晚上做这些事,并以此为乐。

以前关于这个问题,我第一个简单的答案是PythonList对象。我曾在许多数据科学项目中使用List对象,包括数据管道和提取-转换-加载(ETL)生产系统等。然后我就想到了以下问题:我可以使用List对象进行数百万或数十亿行数据的操作和分析吗?如果我将数据科学项目切分成许多小任务,然后使用最新的Pythonasyncio库异步运行它们,会怎么样呢?基于这些问题,我决定抽出一些时间,借助Python数据生态系统库寻找可用于大数据分析的一些实用解决方案。为了便于读者理解和快速验证结果,程序将计算由浮点数组成的一维NumPy数组的算术平均值、中值和样本标准差。为了对比程序的运行时间,我将使用以下库:

NumPy-NumPy是用于科学计算的基础Python包。

Numba-Numba提供了由Python直接编写的高性能函数来加速应用程序的能力。通过几个注释,面向数组和数学计算较多的Python代码就可以被实时编译为原生机器指令。而且Numba拥有类似于C、C++和FORTRAN的性能,无需切换语言或Python解释器。

asyncio-asyncio是Python异步编程库。

为什么要使用NumPy?

正如NumPy网站所说:“NumPy是用于科学计算的基础Python包。它提供了强大的N维数组对象和复杂的(广播)功能。”导入NumPy库之后,Python程序的性能更好、执行速度更快、更容易保证一致性并能方便地使用大量的数学运算和矩阵功能。也许正因为如此,我们不再需要使用PythonList对象了?重要的是,许多Python数据生态系统库都基于NumPy之上,像Pandas、SciPy、Matplotlib等等。

用到的Python算法

在此我将通过算术平均值、中值和样本标准差的简单计算,来展示不同Python程序的运行时间并进行对比。测试数据来自由64位浮点数构成的一维NumPy数组。我将实现以下三种Python算法并对它们进行分析:

NumPy数组

结合asyncio异步库的NumPy数组

结合Numba库的NumPy数组

NumPy数组程序

我们来看看每个算法的代码。每个算法都有遵循面向对象编程(OOP)方法的类对象和主调用程序。类对象包含以下五种方法:

calculate_number_observation()-计算观测数

calculate_arithmetic_mean()-计算算术平均值

calculate_median()-计算中值

calculate_sample_standard_deviation()-计算样本标准差

print_exception_message()-如果出现异常,打印异常信息

列表1显示了单独使用NumPy数组的汇总统计类对象代码。

importsys

importtraceback

importtime

frommathimportsqrt

classSummaryStatistics(object):

"""

使用标准过程计算观测数、算术平均值、中值和样本标准差

"""

def__init__(self):

pass

defcalculate_number_observation(self,one_dimensional_array):

"""

计算观测数

:参数one_dimensional_array:numpy一维数组

:返回值观测数

"""

number_observation=0

try:

number_observation=one_dimensional_array.size

exceptException:

self.print_exception_message()

returnnumber_observation

defcalculate_arithmetic_mean(self,one_dimensional_array,number_observation):

"""

计算算术平均值

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:返回值算术平均值

"""

arithmetic_mean=0.0

try:

sum_result=0.0

foriinrange(number_observation):

sum_result+=one_dimensional_array[i]

arithmetic_mean=sum_result/number_observation

exceptException:

self.print_exception_message()

returnarithmetic_mean

defcalculate_median(self,one_dimensional_array,number_observation):

"""

计算中值

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:返回值中值

"""

median=0.0

try:

one_dimensional_array.sort()

half_position=number_observation//2

ifnotnumber_observation%2:

median=(one_dimensional_array[half_position-1]+one_dimensional_array[half_position])/2.0

else:

median=one_dimensional_array[half_position]

exceptException:

self.print_exception_message()

returnmedian

defcalculate_sample_standard_deviation(self,one_dimensional_array,number_observation,arithmetic_mean):

"""

计算样本标准差

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:参数arithmetic_mean:算术平均值

:返回值样本标准差值

"""

sample_standard_deviation=0.0

try:

sum_result=0.0

foriinrange(number_observation):

sum_result+=pow((one_dimensional_array[i]-arithmetic_mean),2)

sample_variance=sum_result/(number_observation-1)

sample_standard_deviation=sqrt(sample_variance)

exceptException:

self.print_exception_message()

returnsample_standard_deviation

defprint_exception_message(self,message_orientation="horizontal"):

"""

打印完整的异常信息

:参数message_orientation:水平或垂直

:返回值空

"""

try:

exc_type,exc_value,exc_tb=sys.exc_info()

file_name,line_number,procedure_name,line_code=traceback.extract_tb(exc_tb)[-1]

time_stamp="[TimeStamp]:"+str(time.strftime("%Y-%m-%d%I:%M:%S%p"))

file_name="[FileName]:"+str(file_name)

procedure_name="[ProcedureName]:"+str(procedure_name)

error_message="[ErrorMessage]:"+str(exc_value)

error_type="[ErrorType]:"+str(exc_type)

line_number="[LineNumber]:"+str(line_number)

line_code="[LineCode]:"+str(line_code)

if(message_orientation=="horizontal"):

print("Anerroroccurred:{};{};{};{};{};{};{}".format(time_stamp,file_name,procedure_name,error_message,error_type,line_number,line_code))

elif(message_orientation=="vertical"):

print("Anerroroccurred:\n{}\n{}\n{}\n{}\n{}\n{}\n{}".format(time_stamp,file_name,procedure_name,error_message,error_type,line_number,line_code))

else:

pass

exceptException:

pass

列表1.单独使用NumPy数组的汇总统计类对象代码

列表2显示了汇总统计的主程序。可以看到,程序创建了summary_statistics类对象,然后调用其中的方法。该程序导入NumPy库以生成一维数组,并使用time模块中的clock()方法来计算程序的运行时间。

importtime

importnumpyasnp

fromclass_summary_statisticsimportSummaryStatistics

defmain(one_dimensional_array):

#创建汇总统计类对象

summary_statistics=SummaryStatistics()

#计算观测数

number_observation=summary_statistics.calculate_number_observation(one_dimensional_array)

print("NumberofObservation:{}".format(number_observation))

#计算算术平均值

arithmetic_mean=summary_statistics.calculate_arithmetic_mean(one_dimensional_array,number_observation)

print("ArithmeticMean:{}".format(arithmetic_mean))

#计算中值

median=summary_statistics.calculate_median(one_dimensional_array,number_observation)

print("Median:{}".format(median))

#计算样本标准差

sample_standard_deviation=summary_statistics.calculate_sample_standard_deviation(one_dimensional_array,number_observation,arithmetic_mean)

print("SampleStandardDeviation:{}".format(sample_standard_deviation))

if__name__=='__main__':

start_time=time.clock()

one_dimensional_array=np.arange(100000000,dtype=np.float64)

main(one_dimensional_array)

end_time=time.clock()

print("ProgramRuntime:{}seconds".format(round(end_time-start_time,1)))

列表2.使用NumPy数组的汇总统计类对象代码

当行数达到1百万时,汇总统计主程序将得到以下结果:

观测数:1000000

算术平均值:499999.5

中值:499999.5

样本标准差:288675.27893349814

程序运行时间:1.3秒

结合asyncio异步库的NumPy数组

列表3显示了结合Pythonasyncio异步库的汇总统计asyncio类对象代码。请注意,main()方法使用calculate_number_observation()作为第一个也是唯一任务来启动事件循环异步进程。

importsys

importtime

importtraceback

importasyncio

frommathimportsqrt

classSummaryStatisticsAsyncio(object):

"""

使用asyncion库计算观测数、算术平均值、中值和样本标准差

"""

def__init__(self):

pass

asyncdefcalculate_number_observation(self,one_dimensional_array):

"""

计算观测数

:参数one_dimensional_array:numpy一维数组

:返回值空

"""

try:

print('startcalculate_number_observation()procedure')

awaitasyncio.sleep(0)

number_observation=one_dimensional_array.size

print("NumberofObservation:{}".format(number_observation))

print("观测数:{}".format(number_observation))

awaitself.calcuate_arithmetic_mean(one_dimensional_array,number_observation)

print("finishedcalculate_number_observation()procedure")

exceptException:

self.print_exception_message()

asyncdefcalcuate_arithmetic_mean(self,one_dimensional_array,number_observation):

"""

计算算术平均值

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:返回值空

"""

try:

print('startcalcuate_arithmetic_mean()procedure')

awaitself.calculate_median(one_dimensional_array,number_observation)

sum_result=0.0

awaitasyncio.sleep(0)

foriinrange(number_observation):

sum_result+=one_dimensional_array[i]

arithmetic_mean=sum_result/number_observation

print("ArithmeticMean:{}".format(arithmetic_mean))

awaitself.calculate_sample_standard_deviation(one_dimensional_array,number_observation,arithmetic_mean)

print("finishedcalcuate_arithmetic_mean()procedure")

exceptException:

self.print_exception_message()

asyncdefcalculate_median(self,one_dimensional_array,number_observation):

"""

计算中值

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:返回值空

"""

try:

print('startingcalculate_median()')

awaitasyncio.sleep(0)

one_dimensional_array.sort()

half_position=number_observation//2

ifnotnumber_observation%2:

median=(one_dimensional_array[half_position-1]+one_dimensional_array[half_position])/2.0

else:

median=one_dimensional_array[half_position]

print("Median:{}".format(median))

print("finishedcalculate_median()procedure")

exceptException:

self.print_exception_message()

asyncdefcalculate_sample_standard_deviation(self,one_dimensional_array,number_observation,arithmetic_mean):

"""

计算样本标准差

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:参数arithmetic_mean:算术平均值

:返回值空

"""

try:

print('startcalculate_sample_standard_deviation()procedure')

awaitasyncio.sleep(0)

sum_result=0.0

foriinrange(number_observation):

sum_result+=pow((one_dimensional_array[i]-arithmetic_mean),2)

sample_variance=sum_result/(number_observation-1)

sample_standard_deviation=sqrt(sample_variance)

print("SampleStandardDeviation:{}".format(sample_standard_deviation))

print("finishedcalculate_sample_standard_deviation()procedure")

exceptException:

self.print_exception_message()

defprint_exception_message(self,message_orientation="horizontal"):

"""

打印完整异常消息

:参数message_orientation:水平或垂直

:返回值空

"""

try:

exc_type,exc_value,exc_tb=sys.exc_info()

file_name,line_number,procedure_name,line_code=traceback.extract_tb(exc_tb)[-1]

time_stamp="[TimeStamp]:"+str(time.strftime("%Y-%m-%d%I:%M:%S%p"))

file_name="[FileName]:"+str(file_name)

procedure_name="[ProcedureName]:"+str(procedure_name)

error_message="[ErrorMessage]:"+str(exc_value)

error_type="[ErrorType]:"+str(exc_type)

line_number="[LineNumber]:"+str(line_number)

line_code="[LineCode]:"+str(line_code)

if(message_orientation=="horizontal"):

print("Anerroroccurred:{};{};{};{};{};{};{}".format(time_stamp,file_name,procedure_name,error_message,error_type,line_number,line_code))

elif(message_orientation=="vertical"):

print("Anerroroccurred:\n{}\n{}\n{}\n{}\n{}\n{}\n{}".format(time_stamp,file_name,procedure_name,error_message,error_type,line_number,line_code))

else:

pass

exceptException:

pass

defmain(self,one_dimensional_array):

"""

启动事件循环异步进程

:参数one_dimensional_array:numpy一维数组

"""

try:

ioloop=asyncio.get_event_loop()

tasks=[ioloop.create_task(self.calculate_number_observation(one_dimensional_array))]

wait_tasks=asyncio.wait(tasks)

ioloop.run_until_complete(wait_tasks)

ioloop.close()

exceptException:

列表3.结合Python异步库的汇总统计asyncio类对象代码

汇总统计asyncio主程序如列表4所示。可以看到main()方法是唯一被调用的方法。

importtime

importnumpyasnp

fromclass_summary_statistics_asyncioimportSummaryStatisticsAsyncio

defmain(one_dimensional_array):

#新建汇总统计asyncio类对象

summary_statistics_asyncio=SummaryStatisticsAsyncio()

#调用main方法

summary_statistics_asyncio.main(one_dimensional_array)

if__name__=='__main__':

start_time=time.clock()

one_dimensional_array=np.arange(1000000000,dtype=np.float64)

main(one_dimensional_array)

end_time=time.clock()

print("ProgramRuntime:{}seconds".format(round(end_time-start_time,1)))

列表4.结合Python异步库的汇总统计asyncio主程序代码

当行数达到1百万时,结合asyncio库的汇总统计主程序将得到以下结果。我加入了开始/结束过程的打印,以展示异步过程在这种特殊情况下的工作原理。

startcalculate_number_observation()procedure

观测数:1000000000

startcalcuate_arithmetic_mean()procedure

startingcalculate_median()

中值:499999.5

finishedcalculate_median()procedure

算术平均值:499999.5

startcalculate_sample_standard_deviation()procedure

样本标准差:288675.27893349814

finishedcalculate_sample_standard_deviation()procedure

finishedcalcuate_arithmetic_mean()procedure

finishedcalculate_number_observation()procedure

程序运行时间:1504.4秒

结合Numba库的NumPy数组

结合Numba库的汇总统计类对象代码如列表5所示。你可以访问Numba在GitHub上的目录,以了解更多关于这个Python的开源NumPy感知优化编译器的信息。值得一提的是Numba支持CUDAGPU编程。下面的代码中,调试代码已被删除,以便在编译模式下运行该程序。

importtime

fromnumbaimportjit

importnumpyasnp

frommathimportsqrt

classSummaryStatisticsNumba(object):

"""

结合numba库计算观测数、算术平均值、中值和样本标准差

"""

def__init__(self):

pass

@jit

defcalculate_number_observation(self,one_dimensional_array):

"""

计算观测数

:参数one_dimensional_array:numpy一维数组

:返回值观测数

"""

number_observation=one_dimensional_array.size

returnnumber_observation

@jit

defcalcuate_arithmetic_mean(self,one_dimensional_array,number_observation):

"""

计算算术平均值

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:返回值算术平均值

"""

sum_result=0.0

foriinrange(number_observation):

sum_result+=one_dimensional_array[i]

arithmetic_mean=sum_result/number_observation

returnarithmetic_mean

@jit

defcalculate_median(self,one_dimensional_array,number_observation):

"""

计算中值

:参数one_dimensional_array:指numpy一维数组

:参数number_observation:观测数

:返回值中值

"""

one_dimensional_array.sort()

half_position=number_observation//2

ifnotnumber_observation%2:

median=(one_dimensional_array[half_position-1]+one_dimensional_array[half_position])/2.0

else:

median=one_dimensional_array[half_position]

returnmedian

@jit

defcalculate_sample_standard_deviation(self,one_dimensional_array,number_observation,arithmetic_mean):

"""

计算样本标准差

:参数one_dimensional_array:numpy一维数组

:参数number_observation:观测数

:参数arithmetic_mean:算术平均值

:返回值样本标准差值

"""

sum_result=0.0

foriinrange(number_observation):

sum_result+=pow((one_dimensional_array[i]-arithmetic_mean),2)

sample_variance=sum_result/(number_observation-1)

sample_standard_deviation=sqrt(sample_variance)

returnsample_standard_deviation

列表5结合Numba库的汇总统计类对象代码

汇总统计Numba主程序如列表6所示。

importtime

importnumpyasnp

fromclass_summary_statistics_numbaimportSummaryStatisticsNumba

defmain(one_dimensional_array):

#创建类汇总统计numba类对象

class_summary_statistics_numba=SummaryStatisticsNumba()

#计算观测数

number_observation=class_summary_statistics_numba.calculate_number_observation(one_dimensional_array)

print("NumberofObservation:{}".format(number_observation))

#计算算术平均值

arithmetic_mean=class_summary_statistics_numba.calcuate_arithmetic_mean(one_dimensional_array,number_observation)

print("ArithmeticMean:{}".format(arithmetic_mean))

#计算中值

median=class_summary_statistics_numba.calculate_median(one_dimensional_array,number_observation)

print("Median:{}".format(median))

#计算样本标准差

sample_standard_deviation=class_summary_statistics_numba.calculate_sample_standard_deviation(one_dimensional_array,number_observation,arithmetic_mean)

print("SampleStandardDeviation:{}".format(sample_standard_deviation))

if__name__=='__main__':

start_time=time.clock()

one_dimensional_array=np.arange(1000000000,dtype=np.float64)

main(one_dimensional_array)

end_time=time.clock()

print("ProgramRuntime:{}seconds".format(round(end_time-start_time,1)))

列表6.汇总统计Numba主程序

当行数达到十亿时,汇总统计Numba主程序将得到以下结果。

观测数:1000000000

算术平均值:499999999.067109

中值:499999999.5

样本标准差:288675134.73899055

程序运行时间:40.2秒

NumPy数组达到十亿行时,计算在40.2秒内就完成了,这真是一个激动人心的结果。我认为现在是时候在大数据项目中更多地使用Numba库和NumPy数组了。当然某些特殊场景可能还需要进一步研究和测试。

笔记本硬件参数

下面是我运行上面这些Python程序所使用的笔记本电脑硬件参数:

Windows1064位操作系统

英特尔酷睿™i7-2670QMCPU@2.20GHz

16GB内存

程序运行时间对比

表1显示了数据行数为100万、1000万、1亿和10亿时不同程序的运行时间。

表1:程序运行时间对比

结论

单独使用NumPy数组与结合asyncio异步库的NumPy数组之间没有明显差别。由于本次计算量不足以证明Python数据科学项目中asyncio异步库的性能,因此可能需要进行更多的研究来找到它适合的应用场景。

与单独使用NumPy数组或结合asyncio异步库的NumPy数组相比,将NumPy数组与Numba库组合具有最佳的数据操作和分析性能。当Numpy数组中有十亿行数据时,执行时间竟然只需要40.2秒,令我印象深刻。我怀疑目前的R程序是否也可以达到这样的速度。如果不能,也许现在就是R程序的程序员学习Python及其数据生态系统库的时候了。除此之外,请务必采用持续集成软件开发和部署实践,使用面向对象编程方法论来为实际的生产环境编写Python程序。

在具有不同类型数据源的数据管道和提取-转换-加载(ETL)系统项目中,使用结合Numba库的NumPy数组是目前大数据分析的最佳编程实践之一。你不再需要使用PythonList对象。

最后,献上我在软件业务应用设计和开发领域摸爬滚打25年之后最喜欢的句子之一:

“一个差的程序员写代码只是为了能运行,而一个好的程序员写代码不只是为了运行程序,也为了以后可以更方便地维护和更新”-ErnestBonat博士。

Python培训,就选光环大数据Python培训机构python学习地址:http://hadoop.aura.cn/python/


大数据培训、人工智能培训、Python培训、大数据培训机构、大数据培训班、数据分析培训、大数据可视化培训,就选光环大数据!光环大数据,聘请专业的大数据领域知名讲师,确保教学的整体质量与教学水准。讲师团及时掌握时代潮流技术,将前沿技能融入教学中,确保学生所学知识顺应时代所需。通过深入浅出、通俗易懂的教学方式,指导学生更快的掌握技能知识,成就上万个高薪就业学子。 更多问题咨询,欢迎点击------>>>>在线客服

你可能也喜欢这些

在线客服咨询

领取资料

X
立即免费领取

请准确填写您的信息

点击领取
#第三方统计代码(模版变量) '); })();
'); })();