Only single thread executes parallel SQL query with PySpark using multiprocessing pool












0














I have a case where I am using PySpark (or Spark if I can't do it with Python and instead need to use Scala or Java) to pull data from several hundred database tables that lack primary keys. (Why Oracle would ever create an ERP product that contains tables with primary keys is a different subject... but regardless, we need to be able to pull the data and save the data from each database table into a Parquet file.) I originally tried using Sqoop instead of PySpark, but due to a number of issues we ran into, it made more sense to try using PySpark/Spark instead.



Ideally, I'd like to have each task node in my compute cluster: take the name of a table, query that table from the database, and save that table as a Parquet file (or set of Parquet files) in S3. My first step is to get it working locally in standalone mode. (If I had a primary key for each given table, then I could partition the query and file saving process across different sets of rows for the given table and distribute the row partitions across the task nodes in the compute cluster to perform the file saving operation in parallel, but because Oracle's ERP product lacks primary keys for the tables of concern, that's not an option.)



I'm able to successfully query the target database with PySpark, and I'm able to successfully save the data into a parquet file with multithreading, but for some reason, only a single thread does anything. So, what happens is that only a single thread takes a tableName, queries the database, and saves the file to the desired directory as a Parquet file. Then the job ends as if no other threads were executed. I'm guessing that there may be some type of locking issue taking place.
If I correctly understood the comments here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
then what I'm trying to do should be possible unless there are specific issues related to executing parallel JDBC SQL queries.



Edit: I'm specifically looking for a way that allows me to use a thread pool of some type so that I don't need to manually create a thread for each one of the tables that I need to process and manually load-balance them across the task nodes in my cluster.



Even when I tried setting:



--master local[*]


and



--conf 'spark.scheduler.mode=FAIR'


the problem remained.



Also, to briefly explain my code, I needed to use a custom JDBC driver, and I'm running the code in a Jupyter notebook on Windows, so I'm using a workaround to ensure that PySpark starts with the correct parameters.
(For the record, I have nothing against other operating systems, but my Windows machine is my fastest workstation, so that's why I'm using it.)



Here's my setup:



driverPath = r'C:srcNetSuiteJDBCNQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\src\PySparkConfigs\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()


Then, to test the multiprocessing, I created the file sparkMethods.py in the directory where I'm running my Jupyter notebook and put this method in it:



def testMe(x):
return x*x


When I run:



from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print(pool.map(sparkMethods.testMe, range(10)))


in my Jupyter notebook, I get the expected output:




[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]




Now, before anyone reviles against the way I wrote the next method, please know that I initially tried passing the spark context via a closure and then ran into a Pickling error, as documented here: I can "pickle local objects" if I use a derived class?
So, I included all of the Spark context in this next method that I put into the sparkMethods.py file (at least until I can find a better way). The reason that I put the methods into the external file (instead of including them just in the Jupyter Notebook) was to deal with this problem: https://bugs.python.org/issue25053
as discussed here:
Multiprocessing example giving AttributeError
and here:
python multiprocessing: AttributeError: Can't get attribute "abc"



This is that method that contains the logic for making the JDBC connection:



# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
import os
import os.path
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

jdbcDF = spark.read
.format("jdbc")
.option("url", "OURCONNECTIONURL;")
.option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver")
.option("dbtable", tableName)
.option("user", "USERNAME")
.option("password", "PASSWORD")
.load()

filePath = "C:\src\NetsuiteSparkProject\" + tableName + "\" + tableName + ".parquet"
jdbcDF.write.parquet(filePath)
fileExists = os.path.exists(filePath)
if(fileExists):
return (filePath + " exists!")
else:
return (filePath + " could not be written!")


Then, back in my Jupyter notebook, I run:



import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
with Pool(5) as p:
p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)


The problem is that only one thread seems to execute.



When I execute it, in the console output, I see that it includes this initially:




The process cannot access the file because it is being used by another process.
The system cannot find the file
C:UsersDEVIN~1.BOSAppDataLocalTempspark-class-launcher-output-3662.txt.
. . .




which leads me to suspect that perhaps there is some type of locking taking place.



Regardless, one of the threads will always run to completion successfully and successfully query its corresponding table and save it to a Parquet file as desired. There is some non-determinism in the process because different executions result in a different thread winning the race and consequently processing a different table.
Interestingly, only a single job is getting executed, as shown in the Spark UI:
Spark UI picture that shows that only one Spark Job was executed
However, the article here: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0
implies that I should be expecting to see multiple jobs in the Spark UI if they were successfully started.



Now, if the problem is that PySpark is not actually capable of running multiple JDBC queries in parallel across different task nodes, then perhaps my solution would be to use a JDBC connection pool or even just open a connection for each table (as long as I close the connection at the end of the thread).
When getting the list of tables to process, I had success with connecting to the database through the jaydebeapi library like this:



import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",
"OURCONNECTIONURL;",
["USERNAME", "PASSWORD"],
r"C:srcNetSuiteJDBCNQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables


Output is:



['SALES_TERRITORY_PLAN_PARTNER',
'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
'ITEM_ACCOUNT_MAP',
'PRODUCT_TRIAL_STATUS',
'ACCOUNT_PERIOD_ACTIVITY']


So, conceivably, if the problem is that PySpark cannot be used to distribute multiple queries across task nodes like this, then perhaps I can use the jaydebeapi library to make the connection. However, in that case, I'd still need a way to be able to write the output of the JDBC SQL query to a Parquet file (which ideally would leverage Spark's schema inference capability), but I'm open to taking that approach if it's feasible.



So, how do I successfully query the database and save the output to Parquet files in parallel (i.e. distributed across the task nodes) without the master node performing all of the querying sequentially?










share|improve this question
























  • I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers.
    – Sraw
    Nov 21 '18 at 2:12










  • Possible duplicate of How to run independent transformations in parallel using PySpark?
    – user10465355
    Nov 21 '18 at 2:18










  • What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence)
    – bknights
    Nov 21 '18 at 7:14










  • @bknights When was the last time you joined OA_TABLES to OA_FKEYS?
    – devinbost
    Nov 21 '18 at 17:02










  • recently. What are you trying to find?
    – bknights
    Nov 21 '18 at 20:17
















0














I have a case where I am using PySpark (or Spark if I can't do it with Python and instead need to use Scala or Java) to pull data from several hundred database tables that lack primary keys. (Why Oracle would ever create an ERP product that contains tables with primary keys is a different subject... but regardless, we need to be able to pull the data and save the data from each database table into a Parquet file.) I originally tried using Sqoop instead of PySpark, but due to a number of issues we ran into, it made more sense to try using PySpark/Spark instead.



Ideally, I'd like to have each task node in my compute cluster: take the name of a table, query that table from the database, and save that table as a Parquet file (or set of Parquet files) in S3. My first step is to get it working locally in standalone mode. (If I had a primary key for each given table, then I could partition the query and file saving process across different sets of rows for the given table and distribute the row partitions across the task nodes in the compute cluster to perform the file saving operation in parallel, but because Oracle's ERP product lacks primary keys for the tables of concern, that's not an option.)



I'm able to successfully query the target database with PySpark, and I'm able to successfully save the data into a parquet file with multithreading, but for some reason, only a single thread does anything. So, what happens is that only a single thread takes a tableName, queries the database, and saves the file to the desired directory as a Parquet file. Then the job ends as if no other threads were executed. I'm guessing that there may be some type of locking issue taking place.
If I correctly understood the comments here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
then what I'm trying to do should be possible unless there are specific issues related to executing parallel JDBC SQL queries.



Edit: I'm specifically looking for a way that allows me to use a thread pool of some type so that I don't need to manually create a thread for each one of the tables that I need to process and manually load-balance them across the task nodes in my cluster.



Even when I tried setting:



--master local[*]


and



--conf 'spark.scheduler.mode=FAIR'


the problem remained.



Also, to briefly explain my code, I needed to use a custom JDBC driver, and I'm running the code in a Jupyter notebook on Windows, so I'm using a workaround to ensure that PySpark starts with the correct parameters.
(For the record, I have nothing against other operating systems, but my Windows machine is my fastest workstation, so that's why I'm using it.)



Here's my setup:



driverPath = r'C:srcNetSuiteJDBCNQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\src\PySparkConfigs\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()


Then, to test the multiprocessing, I created the file sparkMethods.py in the directory where I'm running my Jupyter notebook and put this method in it:



def testMe(x):
return x*x


When I run:



from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print(pool.map(sparkMethods.testMe, range(10)))


in my Jupyter notebook, I get the expected output:




[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]




Now, before anyone reviles against the way I wrote the next method, please know that I initially tried passing the spark context via a closure and then ran into a Pickling error, as documented here: I can "pickle local objects" if I use a derived class?
So, I included all of the Spark context in this next method that I put into the sparkMethods.py file (at least until I can find a better way). The reason that I put the methods into the external file (instead of including them just in the Jupyter Notebook) was to deal with this problem: https://bugs.python.org/issue25053
as discussed here:
Multiprocessing example giving AttributeError
and here:
python multiprocessing: AttributeError: Can't get attribute "abc"



This is that method that contains the logic for making the JDBC connection:



# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
import os
import os.path
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

jdbcDF = spark.read
.format("jdbc")
.option("url", "OURCONNECTIONURL;")
.option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver")
.option("dbtable", tableName)
.option("user", "USERNAME")
.option("password", "PASSWORD")
.load()

filePath = "C:\src\NetsuiteSparkProject\" + tableName + "\" + tableName + ".parquet"
jdbcDF.write.parquet(filePath)
fileExists = os.path.exists(filePath)
if(fileExists):
return (filePath + " exists!")
else:
return (filePath + " could not be written!")


Then, back in my Jupyter notebook, I run:



import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
with Pool(5) as p:
p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)


The problem is that only one thread seems to execute.



When I execute it, in the console output, I see that it includes this initially:




The process cannot access the file because it is being used by another process.
The system cannot find the file
C:UsersDEVIN~1.BOSAppDataLocalTempspark-class-launcher-output-3662.txt.
. . .




which leads me to suspect that perhaps there is some type of locking taking place.



Regardless, one of the threads will always run to completion successfully and successfully query its corresponding table and save it to a Parquet file as desired. There is some non-determinism in the process because different executions result in a different thread winning the race and consequently processing a different table.
Interestingly, only a single job is getting executed, as shown in the Spark UI:
Spark UI picture that shows that only one Spark Job was executed
However, the article here: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0
implies that I should be expecting to see multiple jobs in the Spark UI if they were successfully started.



Now, if the problem is that PySpark is not actually capable of running multiple JDBC queries in parallel across different task nodes, then perhaps my solution would be to use a JDBC connection pool or even just open a connection for each table (as long as I close the connection at the end of the thread).
When getting the list of tables to process, I had success with connecting to the database through the jaydebeapi library like this:



import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",
"OURCONNECTIONURL;",
["USERNAME", "PASSWORD"],
r"C:srcNetSuiteJDBCNQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables


Output is:



['SALES_TERRITORY_PLAN_PARTNER',
'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
'ITEM_ACCOUNT_MAP',
'PRODUCT_TRIAL_STATUS',
'ACCOUNT_PERIOD_ACTIVITY']


So, conceivably, if the problem is that PySpark cannot be used to distribute multiple queries across task nodes like this, then perhaps I can use the jaydebeapi library to make the connection. However, in that case, I'd still need a way to be able to write the output of the JDBC SQL query to a Parquet file (which ideally would leverage Spark's schema inference capability), but I'm open to taking that approach if it's feasible.



So, how do I successfully query the database and save the output to Parquet files in parallel (i.e. distributed across the task nodes) without the master node performing all of the querying sequentially?










share|improve this question
























  • I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers.
    – Sraw
    Nov 21 '18 at 2:12










  • Possible duplicate of How to run independent transformations in parallel using PySpark?
    – user10465355
    Nov 21 '18 at 2:18










  • What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence)
    – bknights
    Nov 21 '18 at 7:14










  • @bknights When was the last time you joined OA_TABLES to OA_FKEYS?
    – devinbost
    Nov 21 '18 at 17:02










  • recently. What are you trying to find?
    – bknights
    Nov 21 '18 at 20:17














0












0








0







I have a case where I am using PySpark (or Spark if I can't do it with Python and instead need to use Scala or Java) to pull data from several hundred database tables that lack primary keys. (Why Oracle would ever create an ERP product that contains tables with primary keys is a different subject... but regardless, we need to be able to pull the data and save the data from each database table into a Parquet file.) I originally tried using Sqoop instead of PySpark, but due to a number of issues we ran into, it made more sense to try using PySpark/Spark instead.



Ideally, I'd like to have each task node in my compute cluster: take the name of a table, query that table from the database, and save that table as a Parquet file (or set of Parquet files) in S3. My first step is to get it working locally in standalone mode. (If I had a primary key for each given table, then I could partition the query and file saving process across different sets of rows for the given table and distribute the row partitions across the task nodes in the compute cluster to perform the file saving operation in parallel, but because Oracle's ERP product lacks primary keys for the tables of concern, that's not an option.)



I'm able to successfully query the target database with PySpark, and I'm able to successfully save the data into a parquet file with multithreading, but for some reason, only a single thread does anything. So, what happens is that only a single thread takes a tableName, queries the database, and saves the file to the desired directory as a Parquet file. Then the job ends as if no other threads were executed. I'm guessing that there may be some type of locking issue taking place.
If I correctly understood the comments here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
then what I'm trying to do should be possible unless there are specific issues related to executing parallel JDBC SQL queries.



Edit: I'm specifically looking for a way that allows me to use a thread pool of some type so that I don't need to manually create a thread for each one of the tables that I need to process and manually load-balance them across the task nodes in my cluster.



Even when I tried setting:



--master local[*]


and



--conf 'spark.scheduler.mode=FAIR'


the problem remained.



Also, to briefly explain my code, I needed to use a custom JDBC driver, and I'm running the code in a Jupyter notebook on Windows, so I'm using a workaround to ensure that PySpark starts with the correct parameters.
(For the record, I have nothing against other operating systems, but my Windows machine is my fastest workstation, so that's why I'm using it.)



Here's my setup:



driverPath = r'C:srcNetSuiteJDBCNQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\src\PySparkConfigs\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()


Then, to test the multiprocessing, I created the file sparkMethods.py in the directory where I'm running my Jupyter notebook and put this method in it:



def testMe(x):
return x*x


When I run:



from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print(pool.map(sparkMethods.testMe, range(10)))


in my Jupyter notebook, I get the expected output:




[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]




Now, before anyone reviles against the way I wrote the next method, please know that I initially tried passing the spark context via a closure and then ran into a Pickling error, as documented here: I can "pickle local objects" if I use a derived class?
So, I included all of the Spark context in this next method that I put into the sparkMethods.py file (at least until I can find a better way). The reason that I put the methods into the external file (instead of including them just in the Jupyter Notebook) was to deal with this problem: https://bugs.python.org/issue25053
as discussed here:
Multiprocessing example giving AttributeError
and here:
python multiprocessing: AttributeError: Can't get attribute "abc"



This is that method that contains the logic for making the JDBC connection:



# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
import os
import os.path
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

jdbcDF = spark.read
.format("jdbc")
.option("url", "OURCONNECTIONURL;")
.option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver")
.option("dbtable", tableName)
.option("user", "USERNAME")
.option("password", "PASSWORD")
.load()

filePath = "C:\src\NetsuiteSparkProject\" + tableName + "\" + tableName + ".parquet"
jdbcDF.write.parquet(filePath)
fileExists = os.path.exists(filePath)
if(fileExists):
return (filePath + " exists!")
else:
return (filePath + " could not be written!")


Then, back in my Jupyter notebook, I run:



import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
with Pool(5) as p:
p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)


The problem is that only one thread seems to execute.



When I execute it, in the console output, I see that it includes this initially:




The process cannot access the file because it is being used by another process.
The system cannot find the file
C:UsersDEVIN~1.BOSAppDataLocalTempspark-class-launcher-output-3662.txt.
. . .




which leads me to suspect that perhaps there is some type of locking taking place.



Regardless, one of the threads will always run to completion successfully and successfully query its corresponding table and save it to a Parquet file as desired. There is some non-determinism in the process because different executions result in a different thread winning the race and consequently processing a different table.
Interestingly, only a single job is getting executed, as shown in the Spark UI:
Spark UI picture that shows that only one Spark Job was executed
However, the article here: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0
implies that I should be expecting to see multiple jobs in the Spark UI if they were successfully started.



Now, if the problem is that PySpark is not actually capable of running multiple JDBC queries in parallel across different task nodes, then perhaps my solution would be to use a JDBC connection pool or even just open a connection for each table (as long as I close the connection at the end of the thread).
When getting the list of tables to process, I had success with connecting to the database through the jaydebeapi library like this:



import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",
"OURCONNECTIONURL;",
["USERNAME", "PASSWORD"],
r"C:srcNetSuiteJDBCNQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables


Output is:



['SALES_TERRITORY_PLAN_PARTNER',
'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
'ITEM_ACCOUNT_MAP',
'PRODUCT_TRIAL_STATUS',
'ACCOUNT_PERIOD_ACTIVITY']


So, conceivably, if the problem is that PySpark cannot be used to distribute multiple queries across task nodes like this, then perhaps I can use the jaydebeapi library to make the connection. However, in that case, I'd still need a way to be able to write the output of the JDBC SQL query to a Parquet file (which ideally would leverage Spark's schema inference capability), but I'm open to taking that approach if it's feasible.



So, how do I successfully query the database and save the output to Parquet files in parallel (i.e. distributed across the task nodes) without the master node performing all of the querying sequentially?










share|improve this question















I have a case where I am using PySpark (or Spark if I can't do it with Python and instead need to use Scala or Java) to pull data from several hundred database tables that lack primary keys. (Why Oracle would ever create an ERP product that contains tables with primary keys is a different subject... but regardless, we need to be able to pull the data and save the data from each database table into a Parquet file.) I originally tried using Sqoop instead of PySpark, but due to a number of issues we ran into, it made more sense to try using PySpark/Spark instead.



Ideally, I'd like to have each task node in my compute cluster: take the name of a table, query that table from the database, and save that table as a Parquet file (or set of Parquet files) in S3. My first step is to get it working locally in standalone mode. (If I had a primary key for each given table, then I could partition the query and file saving process across different sets of rows for the given table and distribute the row partitions across the task nodes in the compute cluster to perform the file saving operation in parallel, but because Oracle's ERP product lacks primary keys for the tables of concern, that's not an option.)



I'm able to successfully query the target database with PySpark, and I'm able to successfully save the data into a parquet file with multithreading, but for some reason, only a single thread does anything. So, what happens is that only a single thread takes a tableName, queries the database, and saves the file to the desired directory as a Parquet file. Then the job ends as if no other threads were executed. I'm guessing that there may be some type of locking issue taking place.
If I correctly understood the comments here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
then what I'm trying to do should be possible unless there are specific issues related to executing parallel JDBC SQL queries.



Edit: I'm specifically looking for a way that allows me to use a thread pool of some type so that I don't need to manually create a thread for each one of the tables that I need to process and manually load-balance them across the task nodes in my cluster.



Even when I tried setting:



--master local[*]


and



--conf 'spark.scheduler.mode=FAIR'


the problem remained.



Also, to briefly explain my code, I needed to use a custom JDBC driver, and I'm running the code in a Jupyter notebook on Windows, so I'm using a workaround to ensure that PySpark starts with the correct parameters.
(For the record, I have nothing against other operating systems, but my Windows machine is my fastest workstation, so that's why I'm using it.)



Here's my setup:



driverPath = r'C:srcNetSuiteJDBCNQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\src\PySparkConfigs\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()


Then, to test the multiprocessing, I created the file sparkMethods.py in the directory where I'm running my Jupyter notebook and put this method in it:



def testMe(x):
return x*x


When I run:



from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print(pool.map(sparkMethods.testMe, range(10)))


in my Jupyter notebook, I get the expected output:




[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]




Now, before anyone reviles against the way I wrote the next method, please know that I initially tried passing the spark context via a closure and then ran into a Pickling error, as documented here: I can "pickle local objects" if I use a derived class?
So, I included all of the Spark context in this next method that I put into the sparkMethods.py file (at least until I can find a better way). The reason that I put the methods into the external file (instead of including them just in the Jupyter Notebook) was to deal with this problem: https://bugs.python.org/issue25053
as discussed here:
Multiprocessing example giving AttributeError
and here:
python multiprocessing: AttributeError: Can't get attribute "abc"



This is that method that contains the logic for making the JDBC connection:



# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
import os
import os.path
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

jdbcDF = spark.read
.format("jdbc")
.option("url", "OURCONNECTIONURL;")
.option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver")
.option("dbtable", tableName)
.option("user", "USERNAME")
.option("password", "PASSWORD")
.load()

filePath = "C:\src\NetsuiteSparkProject\" + tableName + "\" + tableName + ".parquet"
jdbcDF.write.parquet(filePath)
fileExists = os.path.exists(filePath)
if(fileExists):
return (filePath + " exists!")
else:
return (filePath + " could not be written!")


Then, back in my Jupyter notebook, I run:



import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
with Pool(5) as p:
p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)


The problem is that only one thread seems to execute.



When I execute it, in the console output, I see that it includes this initially:




The process cannot access the file because it is being used by another process.
The system cannot find the file
C:UsersDEVIN~1.BOSAppDataLocalTempspark-class-launcher-output-3662.txt.
. . .




which leads me to suspect that perhaps there is some type of locking taking place.



Regardless, one of the threads will always run to completion successfully and successfully query its corresponding table and save it to a Parquet file as desired. There is some non-determinism in the process because different executions result in a different thread winning the race and consequently processing a different table.
Interestingly, only a single job is getting executed, as shown in the Spark UI:
Spark UI picture that shows that only one Spark Job was executed
However, the article here: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0
implies that I should be expecting to see multiple jobs in the Spark UI if they were successfully started.



Now, if the problem is that PySpark is not actually capable of running multiple JDBC queries in parallel across different task nodes, then perhaps my solution would be to use a JDBC connection pool or even just open a connection for each table (as long as I close the connection at the end of the thread).
When getting the list of tables to process, I had success with connecting to the database through the jaydebeapi library like this:



import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",
"OURCONNECTIONURL;",
["USERNAME", "PASSWORD"],
r"C:srcNetSuiteJDBCNQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables


Output is:



['SALES_TERRITORY_PLAN_PARTNER',
'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
'ITEM_ACCOUNT_MAP',
'PRODUCT_TRIAL_STATUS',
'ACCOUNT_PERIOD_ACTIVITY']


So, conceivably, if the problem is that PySpark cannot be used to distribute multiple queries across task nodes like this, then perhaps I can use the jaydebeapi library to make the connection. However, in that case, I'd still need a way to be able to write the output of the JDBC SQL query to a Parquet file (which ideally would leverage Spark's schema inference capability), but I'm open to taking that approach if it's feasible.



So, how do I successfully query the database and save the output to Parquet files in parallel (i.e. distributed across the task nodes) without the master node performing all of the querying sequentially?







python multithreading apache-spark pyspark netsuite






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 21 '18 at 18:21

























asked Nov 21 '18 at 1:57









devinbost

1,6451626




1,6451626












  • I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers.
    – Sraw
    Nov 21 '18 at 2:12










  • Possible duplicate of How to run independent transformations in parallel using PySpark?
    – user10465355
    Nov 21 '18 at 2:18










  • What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence)
    – bknights
    Nov 21 '18 at 7:14










  • @bknights When was the last time you joined OA_TABLES to OA_FKEYS?
    – devinbost
    Nov 21 '18 at 17:02










  • recently. What are you trying to find?
    – bknights
    Nov 21 '18 at 20:17


















  • I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers.
    – Sraw
    Nov 21 '18 at 2:12










  • Possible duplicate of How to run independent transformations in parallel using PySpark?
    – user10465355
    Nov 21 '18 at 2:18










  • What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence)
    – bknights
    Nov 21 '18 at 7:14










  • @bknights When was the last time you joined OA_TABLES to OA_FKEYS?
    – devinbost
    Nov 21 '18 at 17:02










  • recently. What are you trying to find?
    – bknights
    Nov 21 '18 at 20:17
















I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers.
– Sraw
Nov 21 '18 at 2:12




I think, the problem is you shouldn't use multiprocess but multithread. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers.
– Sraw
Nov 21 '18 at 2:12












Possible duplicate of How to run independent transformations in parallel using PySpark?
– user10465355
Nov 21 '18 at 2:18




Possible duplicate of How to run independent transformations in parallel using PySpark?
– user10465355
Nov 21 '18 at 2:18












What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence)
– bknights
Nov 21 '18 at 7:14




What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence)
– bknights
Nov 21 '18 at 7:14












@bknights When was the last time you joined OA_TABLES to OA_FKEYS?
– devinbost
Nov 21 '18 at 17:02




@bknights When was the last time you joined OA_TABLES to OA_FKEYS?
– devinbost
Nov 21 '18 at 17:02












recently. What are you trying to find?
– bknights
Nov 21 '18 at 20:17




recently. What are you trying to find?
– bknights
Nov 21 '18 at 20:17












1 Answer
1






active

oldest

votes


















0














With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark?
I investigated the use of threading instead of multiprocessing.
I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
and noticed the use of:



from multiprocessing.pool import ThreadPool


I was able to make it work, like this:



from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close()
pool.join()
print(*results, sep='n')


which prints:



C:srcNetsuiteSparkProjectSALES_TERRITORY_PLAN_PARTNERSALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:srcNetsuiteSparkProjectWORK_ORDER_SCHOOLS_TO_INSTALL_MAPWORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:srcNetsuiteSparkProjectITEM_ACCOUNT_MAPITEM_ACCOUNT_MAP.parquet exists!
C:srcNetsuiteSparkProjectPRODUCT_TRIAL_STATUSPRODUCT_TRIAL_STATUS.parquet exists!
C:srcNetsuiteSparkProjectACCOUNT_PERIOD_ACTIVITYACCOUNT_PERIOD_ACTIVITY.parquet exists!





share|improve this answer





















    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53404288%2fonly-single-thread-executes-parallel-sql-query-with-pyspark-using-multiprocessin%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark?
    I investigated the use of threading instead of multiprocessing.
    I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
    and noticed the use of:



    from multiprocessing.pool import ThreadPool


    I was able to make it work, like this:



    from multiprocessing.pool import ThreadPool
    pool = ThreadPool(5)
    results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
    pool.close()
    pool.join()
    print(*results, sep='n')


    which prints:



    C:srcNetsuiteSparkProjectSALES_TERRITORY_PLAN_PARTNERSALES_TERRITORY_PLAN_PARTNER.parquet exists!
    C:srcNetsuiteSparkProjectWORK_ORDER_SCHOOLS_TO_INSTALL_MAPWORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
    C:srcNetsuiteSparkProjectITEM_ACCOUNT_MAPITEM_ACCOUNT_MAP.parquet exists!
    C:srcNetsuiteSparkProjectPRODUCT_TRIAL_STATUSPRODUCT_TRIAL_STATUS.parquet exists!
    C:srcNetsuiteSparkProjectACCOUNT_PERIOD_ACTIVITYACCOUNT_PERIOD_ACTIVITY.parquet exists!





    share|improve this answer


























      0














      With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark?
      I investigated the use of threading instead of multiprocessing.
      I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
      and noticed the use of:



      from multiprocessing.pool import ThreadPool


      I was able to make it work, like this:



      from multiprocessing.pool import ThreadPool
      pool = ThreadPool(5)
      results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
      pool.close()
      pool.join()
      print(*results, sep='n')


      which prints:



      C:srcNetsuiteSparkProjectSALES_TERRITORY_PLAN_PARTNERSALES_TERRITORY_PLAN_PARTNER.parquet exists!
      C:srcNetsuiteSparkProjectWORK_ORDER_SCHOOLS_TO_INSTALL_MAPWORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
      C:srcNetsuiteSparkProjectITEM_ACCOUNT_MAPITEM_ACCOUNT_MAP.parquet exists!
      C:srcNetsuiteSparkProjectPRODUCT_TRIAL_STATUSPRODUCT_TRIAL_STATUS.parquet exists!
      C:srcNetsuiteSparkProjectACCOUNT_PERIOD_ACTIVITYACCOUNT_PERIOD_ACTIVITY.parquet exists!





      share|improve this answer
























        0












        0








        0






        With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark?
        I investigated the use of threading instead of multiprocessing.
        I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
        and noticed the use of:



        from multiprocessing.pool import ThreadPool


        I was able to make it work, like this:



        from multiprocessing.pool import ThreadPool
        pool = ThreadPool(5)
        results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
        pool.close()
        pool.join()
        print(*results, sep='n')


        which prints:



        C:srcNetsuiteSparkProjectSALES_TERRITORY_PLAN_PARTNERSALES_TERRITORY_PLAN_PARTNER.parquet exists!
        C:srcNetsuiteSparkProjectWORK_ORDER_SCHOOLS_TO_INSTALL_MAPWORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
        C:srcNetsuiteSparkProjectITEM_ACCOUNT_MAPITEM_ACCOUNT_MAP.parquet exists!
        C:srcNetsuiteSparkProjectPRODUCT_TRIAL_STATUSPRODUCT_TRIAL_STATUS.parquet exists!
        C:srcNetsuiteSparkProjectACCOUNT_PERIOD_ACTIVITYACCOUNT_PERIOD_ACTIVITY.parquet exists!





        share|improve this answer












        With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark?
        I investigated the use of threading instead of multiprocessing.
        I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
        and noticed the use of:



        from multiprocessing.pool import ThreadPool


        I was able to make it work, like this:



        from multiprocessing.pool import ThreadPool
        pool = ThreadPool(5)
        results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
        pool.close()
        pool.join()
        print(*results, sep='n')


        which prints:



        C:srcNetsuiteSparkProjectSALES_TERRITORY_PLAN_PARTNERSALES_TERRITORY_PLAN_PARTNER.parquet exists!
        C:srcNetsuiteSparkProjectWORK_ORDER_SCHOOLS_TO_INSTALL_MAPWORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
        C:srcNetsuiteSparkProjectITEM_ACCOUNT_MAPITEM_ACCOUNT_MAP.parquet exists!
        C:srcNetsuiteSparkProjectPRODUCT_TRIAL_STATUSPRODUCT_TRIAL_STATUS.parquet exists!
        C:srcNetsuiteSparkProjectACCOUNT_PERIOD_ACTIVITYACCOUNT_PERIOD_ACTIVITY.parquet exists!






        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 21 '18 at 18:28









        devinbost

        1,6451626




        1,6451626






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.





            Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


            Please pay close attention to the following guidance:


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53404288%2fonly-single-thread-executes-parallel-sql-query-with-pyspark-using-multiprocessin%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Create new schema in PostgreSQL using DBeaver

            Deepest pit of an array with Javascript: test on Codility

            Costa Masnaga