%load_ext autoreload
%autoreload 2
import sys
sys.path.append("..")
from optimus import Optimus
# Create optimus
op = Optimus(master="local", app_name= "optimus", verbose = True)
You are using PySparkling of version 2.4.10, but your PySpark is of version 2.3.1. Please make sure Spark and PySparkling versions are compatible. INFO:optimus:Operative System:Windows INFO:optimus:Just check that Spark and all necessary environments vars are present... INFO:optimus:----- INFO:optimus:SPARK_HOME=C:\opt\spark\spark-2.3.1-bin-hadoop2.7 INFO:optimus:HADOOP_HOME=C:\opt\hadoop-2.7.7 INFO:optimus:PYSPARK_PYTHON=C:\Users\argenisleon\Anaconda3\python.exe INFO:optimus:PYSPARK_DRIVER_PYTHON=jupyter INFO:optimus:PYSPARK_SUBMIT_ARGS=--jars "file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-redis-2.4.1-SNAPSHOT-jar-with-dependencies.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/presto-jdbc-0.224.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-cassandra-connector_2.11-2.4.1.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/sqlite-jdbc-3.27.2.1.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mssql-jdbc-7.4.1.jre8.jar" --driver-class-path "C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-redis-2.4.1-SNAPSHOT-jar-with-dependencies.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/presto-jdbc-0.224.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-cassandra-connector_2.11-2.4.1.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/sqlite-jdbc-3.27.2.1.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mssql-jdbc-7.4.1.jre8.jar" --conf "spark.sql.catalogImplementation=hive" pyspark-shell INFO:optimus:JAVA_HOME=C:\java INFO:optimus:Pyarrow Installed INFO:optimus:----- INFO:optimus:Starting or getting SparkSession and SparkContext... INFO:optimus:Spark Version:2.3.1 INFO:optimus: ____ __ _ / __ \____ / /_(_)___ ___ __ _______ / / / / __ \/ __/ / __ `__ \/ / / / ___/ / /_/ / /_/ / /_/ / / / / / / /_/ (__ ) \____/ .___/\__/_/_/ /_/ /_/\__,_/____/ /_/ INFO:optimus:Transform and Roll out... INFO:optimus:Optimus successfully imported. Have fun :). INFO:optimus:Config.ini not found
# Put your db credentials here
db = op.connect(
driver="mysql",
host="165.227.196.70",
database= "optimus",
user= "test",
password = "test")
INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
db.tables()
INFO:optimus:(SELECT table_name, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'optimus' LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
TABLE_NAME
1 (string)
nullable
|
TABLE_ROWS
2 (decimal(20,0))
nullable
|
---|---|
test_data
|
100
|
# db.execute("SHOW KEYS FROM test_data WHERE key_name = 'PRIMARY'")
db.execute("SELECT * FROM test_data").ext.display()
INFO:optimus:(SELECT * FROM test_data LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
id
1 (int)
nullable
|
first_name
2 (string)
nullable
|
last_name
3 (string)
nullable
|
email
4 (string)
nullable
|
gender
5 (string)
nullable
|
ip_address
6 (string)
nullable
|
---|---|---|---|---|---|
1
|
Ikey
|
Crudginton
|
icrudginton0@freewebs.com
|
Male
|
72.210.21.255
|
2
|
Erwin
|
Edden
|
eedden1@nytimes.com
|
Male
|
16.205.155.142
|
3
|
Rudyard
|
Dullaghan
|
rdullaghan2@techcrunch.com
|
Male
|
84.170.67.167
|
4
|
Eugen
|
Staining
|
estaining3@merriam-webster.com
|
Male
|
211.36.45.228
|
5
|
Carleton
|
Hammond
|
chammond4@example.com
|
Male
|
177.7.250.134
|
6
|
Ermengarde
|
Knightly
|
eknightly5@google.co.jp
|
Female
|
231.176.117.190
|
7
|
Myles
|
Rattray
|
mrattray6@about.com
|
Male
|
4.193.247.67
|
8
|
Banky
|
Shires
|
bshires7@so-net.ne.jp
|
Male
|
16.18.210.158
|
9
|
Chastity
|
Birtwell
|
cbirtwell8@seesaa.net
|
Female
|
167.15.222.219
|
10
|
Harv
|
Fotherby
|
hfotherby9@godaddy.com
|
Male
|
143.117.248.106
|
db.execute("SELECT * FROM test_data", partition_column ="id", table_name = "test_data").ext.display()
INFO:optimus:(SELECT * FROM test_data LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public INFO:optimus:(SELECT min(id) AS min, max(id) AS max FROM test_data LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
id
1 (int)
nullable
|
first_name
2 (string)
nullable
|
last_name
3 (string)
nullable
|
email
4 (string)
nullable
|
gender
5 (string)
nullable
|
ip_address
6 (string)
nullable
|
---|---|---|---|---|---|
1
|
Ikey
|
Crudginton
|
icrudginton0@freewebs.com
|
Male
|
72.210.21.255
|
2
|
Erwin
|
Edden
|
eedden1@nytimes.com
|
Male
|
16.205.155.142
|
3
|
Rudyard
|
Dullaghan
|
rdullaghan2@techcrunch.com
|
Male
|
84.170.67.167
|
4
|
Eugen
|
Staining
|
estaining3@merriam-webster.com
|
Male
|
211.36.45.228
|
5
|
Carleton
|
Hammond
|
chammond4@example.com
|
Male
|
177.7.250.134
|
6
|
Ermengarde
|
Knightly
|
eknightly5@google.co.jp
|
Female
|
231.176.117.190
|
7
|
Myles
|
Rattray
|
mrattray6@about.com
|
Male
|
4.193.247.67
|
8
|
Banky
|
Shires
|
bshires7@so-net.ne.jp
|
Male
|
16.18.210.158
|
9
|
Chastity
|
Birtwell
|
cbirtwell8@seesaa.net
|
Female
|
167.15.222.219
|
10
|
Harv
|
Fotherby
|
hfotherby9@godaddy.com
|
Male
|
143.117.248.106
|
db.table_to_df("test_data", partition_column ="id").ext.display()
INFO:optimus:SELECT * FROM test_data INFO:optimus:(SELECT * FROM test_data LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public INFO:optimus:(SELECT min(id) AS min, max(id) AS max FROM test_data LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
id
1 (int)
nullable
|
first_name
2 (string)
nullable
|
last_name
3 (string)
nullable
|
email
4 (string)
nullable
|
gender
5 (string)
nullable
|
ip_address
6 (string)
nullable
|
---|---|---|---|---|---|
1
|
Ikey
|
Crudginton
|
icrudginton0@freewebs.com
|
Male
|
72.210.21.255
|
2
|
Erwin
|
Edden
|
eedden1@nytimes.com
|
Male
|
16.205.155.142
|
3
|
Rudyard
|
Dullaghan
|
rdullaghan2@techcrunch.com
|
Male
|
84.170.67.167
|
4
|
Eugen
|
Staining
|
estaining3@merriam-webster.com
|
Male
|
211.36.45.228
|
5
|
Carleton
|
Hammond
|
chammond4@example.com
|
Male
|
177.7.250.134
|
6
|
Ermengarde
|
Knightly
|
eknightly5@google.co.jp
|
Female
|
231.176.117.190
|
7
|
Myles
|
Rattray
|
mrattray6@about.com
|
Male
|
4.193.247.67
|
8
|
Banky
|
Shires
|
bshires7@so-net.ne.jp
|
Male
|
16.18.210.158
|
9
|
Chastity
|
Birtwell
|
cbirtwell8@seesaa.net
|
Female
|
167.15.222.219
|
10
|
Harv
|
Fotherby
|
hfotherby9@godaddy.com
|
Male
|
143.117.248.106
|
df = db.table_to_df("test_data", limit=None)
INFO:optimus:SELECT * FROM test_data INFO:optimus:(SELECT * FROM test_data LIMIT 10 ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
db.tables_names_to_json()
INFO:optimus:(SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'optimus' GROUP BY TABLE_NAME ) AS t INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
['test_data']
# Put your db credentials here
db = op.connect(
driver="postgresql",
host="165.227.196.70",
database= "optimus",
user= "testuser",
password = "test")
INFO:optimus:jdbc:postgresql://165.227.196.70:5432/optimus?currentSchema=public
db.tables()
db.table_to_df("test_data").table()
db.tables_names_to_json()
INFO:optimus:( SELECT relname as table_name FROM pg_class C LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE nspname IN ('public') AND relkind='r' ORDER BY reltuples DESC) AS t INFO:optimus:jdbc:postgresql://165.227.196.70:5432/optimus?currentSchema=public
['test_data']
# Put your db credentials here
db = op.connect(
driver="sqlserver",
host="165.227.196.70",
database= "optimus",
user= "test",
password = "test*0261")
INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
db.tables()
INFO:optimus:(SELECT * FROM INFORMATION_SCHEMA.TABLES) AS t INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
TABLE_CATALOG
1 (string)
nullable
|
TABLE_SCHEMA
2 (string)
nullable
|
TABLE_NAME
3 (string)
nullable
|
TABLE_TYPE
4 (string)
nullable
|
---|---|---|---|
optimus
|
dbo
|
test_data
|
BASE⋅TABLE
|
db.table_to_df("test_data").table()
INFO:optimus:SELECT * FROM test_data INFO:optimus:(SELECT * FROM test_data) AS t INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
id
1 (int)
nullable
|
first_name
2 (string)
nullable
|
last_name
3 (string)
nullable
|
email
4 (string)
nullable
|
gender
5 (string)
nullable
|
ip_address
6 (string)
nullable
|
---|---|---|---|---|---|
1
|
Keenan
|
McAirt
|
kmcairt0@spotify.com
|
Male
|
68.97.227.147
|
2
|
Fredelia
|
Lemarie
|
flemarie1@furl.net
|
Female
|
16.145.123.46
|
1
|
Keenan
|
McAirt
|
kmcairt0@spotify.com
|
Male
|
68.97.227.147
|
2
|
Fredelia
|
Lemarie
|
flemarie1@furl.net
|
Female
|
16.145.123.46
|
1
|
Keenan
|
McAirt
|
kmcairt0@spotify.com
|
Male
|
68.97.227.147
|
2
|
Fredelia
|
Lemarie
|
flemarie1@furl.net
|
Female
|
16.145.123.46
|
1
|
Keenan
|
McAirt
|
kmcairt0@spotify.com
|
Male
|
68.97.227.147
|
2
|
Fredelia
|
Lemarie
|
flemarie1@furl.net
|
Female
|
16.145.123.46
|
2
|
Fredelia
|
Lemarie
|
flemarie1@furl.net
|
Female
|
16.145.123.46
|
1
|
Evyn
|
Abbey
|
eabbey0@mlb.com
|
Male
|
202.99.246.227
|
db.tables_names_to_json()
INFO:optimus:(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES) AS t INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
['test_data']
# Put your db credentials here
db = op.connect(
driver="redshift",
host="165.227.196.70",
database= "optimus",
user= "testuser",
password = "test")
INFO:optimus:jdbc:redshift://redshift-cluster-1.chuvgsqx7epn.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=public
db.tables()
INFO:optimus:( SELECT relname as table_name,cast (reltuples as integer) AS count FROM pg_class C LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE nspname IN ('public') AND relkind='r' ORDER BY reltuples DESC) AS t INFO:optimus:jdbc:redshift://redshift-cluster-1.chuvgsqx7epn.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=public
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-3-cdef22199e9a> in <module> ----> 1 db.tables() ~\Documents\Optimus\optimus\io\jdbc.py in tables(self, schema, database, limit) 179 FROM user_tables ORDER BY table_name""" 180 --> 181 df = self.execute(query, limit) 182 return df.table(limit) 183 ~\Documents\Optimus\optimus\io\jdbc.py in execute(self, query, limit) 309 conf.options(table=self.cassandra_table, keyspace=self.cassandra_keyspace) 310 --> 311 return self._limit(conf.load(), limit) 312 313 def df_to_table(self, df, table, mode="overwrite"): ~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in load(self, path, format, schema, **options) 170 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path))) 171 else: --> 172 return self._df(self._jreader.load()) 173 174 @since(1.4) ~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: ~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() ~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o41.load. : java.sql.SQLException: [Amazon](500150) Error setting/closing connection: Connection timed out: connect. at com.amazon.redshift.client.PGClient.connect(Unknown Source) at com.amazon.redshift.client.PGClient.<init>(Unknown Source) at com.amazon.redshift.core.PGJDBCConnection.connect(Unknown Source) at com.amazon.jdbc.common.BaseConnectionFactory.doConnect(Unknown Source) at com.amazon.jdbc.common.AbstractDriver.connect(Unknown Source) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) Caused by: com.amazon.support.exceptions.GeneralException: [Amazon](500150) Error setting/closing connection: Connection timed out: connect. ... 24 more Caused by: java.net.ConnectException: Connection timed out: connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:454) at sun.nio.ch.Net.connect(Net.java:446) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:96) at com.amazon.redshift.client.PGClient.connect(Unknown Source) at com.amazon.redshift.client.PGClient.<init>(Unknown Source) at com.amazon.redshift.core.PGJDBCConnection.connect(Unknown Source) at com.amazon.jdbc.common.BaseConnectionFactory.doConnect(Unknown Source) at com.amazon.jdbc.common.AbstractDriver.connect(Unknown Source) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
db.table_to_df("test_data").table()
# Put your db credentials here
db = op.connect(
driver="oracle",
host="165.227.196.70",
database= "optimus",
user= "testuser",
password = "test")
# Put your db credentials here
db = op.connect(
driver="sqlite",
host="chinook.db",
database= "employes",
user= "testuser",
password = "test")
INFO:optimus:jdbc:sqlite:chinook.db
db.tables()
INFO:optimus:(SELECT name FROM sqlite_master WHERE type='table') AS t INFO:optimus:jdbc:sqlite:chinook.db
name
1 (string)
nullable
|
---|
albums
|
sqlite_sequence
|
artists
|
customers
|
employees
|
genres
|
invoices
|
invoice_items
|
media_types
|
playlists
|
db.table_to_df("albums",limit="all").table()
INFO:optimus:(SELECT COUNT(*) as COUNT FROM albums) AS t INFO:optimus:jdbc:sqlite:chinook.db
347 rows
INFO:optimus:SELECT * FROM albums INFO:optimus:(SELECT * FROM albums) AS t INFO:optimus:jdbc:sqlite:chinook.db
AlbumId
1 (int)
nullable
|
Title
2 (string)
nullable
|
ArtistId
3 (int)
nullable
|
---|---|---|
1
|
For⋅Those⋅About⋅To⋅Rock⋅We⋅Salute⋅You
|
1
|
2
|
Balls⋅to⋅the⋅Wall
|
2
|
3
|
Restless⋅and⋅Wild
|
2
|
4
|
Let⋅There⋅Be⋅Rock
|
1
|
5
|
Big⋅Ones
|
3
|
6
|
Jagged⋅Little⋅Pill
|
4
|
7
|
Facelift
|
5
|
8
|
Warner⋅25⋅Anos
|
6
|
9
|
Plays⋅Metallica⋅By⋅Four⋅Cellos
|
7
|
10
|
Audioslave
|
8
|
db.tables_names_to_json()
INFO:optimus:(SELECT name FROM sqlite_master WHERE type='table') AS t INFO:optimus:jdbc:sqlite:chinook.db
['albums', 'sqlite_sequence', 'artists', 'customers', 'employees', 'genres', 'invoices', 'invoice_items', 'media_types', 'playlists', 'playlist_track', 'tracks', 'sqlite_stat1']
df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv", sep=",", header='true', infer_schema='true', charset="UTF-8", null_value="None")
INFO:optimus:Downloading foo.csv from https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv INFO:optimus:Downloaded 967 bytes INFO:optimus:Creating DataFrame for foo.csv. Please wait...
df.table()
id
1 (int)
nullable
|
firstName
2 (string)
nullable
|
lastName
3 (string)
nullable
|
billingId
4 (int)
nullable
|
product
5 (string)
nullable
|
price
6 (int)
nullable
|
birth
7 (string)
nullable
|
dummyCol
8 (string)
nullable
|
---|---|---|---|---|---|---|---|
1
|
Luis
|
Alvarez$$%!
|
123
|
Cake
|
10
|
1980/07/07
|
never
|
2
|
André
|
Ampère
|
423
|
piza
|
8
|
1950/07/08
|
gonna
|
3
|
NiELS
|
Böhr//((%%
|
551
|
pizza
|
8
|
1990/07/09
|
give
|
4
|
PAUL
|
dirac$
|
521
|
pizza
|
8
|
1954/07/10
|
you
|
5
|
Albert
|
Einstein
|
634
|
pizza
|
8
|
1990/07/11
|
up
|
6
|
Galileo
|
⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅GALiLEI
|
672
|
arepa
|
5
|
1930/08/12
|
never
|
7
|
CaRL
|
Ga%%%uss
|
323
|
taco
|
3
|
1970/07/13
|
gonna
|
8
|
David
|
H$$$ilbert
|
624
|
taaaccoo
|
3
|
1950/07/14
|
let
|
9
|
Johannes
|
KEPLER
|
735
|
taco
|
3
|
1920/04/22
|
you
|
10
|
JaMES
|
M$$ax%%well
|
875
|
taco
|
3
|
1923/03/12
|
down
|
# Put your db credentials here
db = op.connect(
driver="redis",
host="165.227.196.70",
port = 6379,
database= 1,
password = "")
db.df_to_table(df, "hola1", redis_primary_key="id")
INFO:optimus:`id`,`firstName`,`lastName`,`billingId`,`product`,`price`,`birth`,`dummyCol` column(s) was not processed because is/are not array,vector INFO:optimus:Outputting 0 columns after filtering. Is this expected? INFO:optimus:Using 'column_exp' to process column 'id' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'firstName' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'lastName' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'billingId' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'product' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'price' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'birth' with function _cast_to INFO:optimus:Using 'column_exp' to process column 'dummyCol' with function _cast_to
hola1
# https://stackoverflow.com/questions/56707978/how-to-write-from-a-pyspark-dstream-to-redis
db.table_to_df(0)
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-33-b3e61395c772> in <module> 1 # https://stackoverflow.com/questions/56707978/how-to-write-from-a-pyspark-dstream-to-redis 2 ----> 3 db.table_to_df(0) ~\Documents\Optimus\optimus\io\jdbc.py in table_to_df(self, table_name, columns, limit) 122 123 db_table = table_name --> 124 query = self.driver_context.count_query(db_table=db_table) 125 if limit == "all": 126 count = self.execute(query, "all").first()[0] ~\Documents\Optimus\optimus\io\driver_context.py in count_query(self, *args, **kwargs) 31 32 def count_query(self, *args, **kwargs) -> str: ---> 33 return self._driver.count_query(*args, **kwargs) ~\Documents\Optimus\optimus\io\sqlserver.py in count_query(self, *args, **kwargs) 24 25 def count_query(self, *args, **kwargs) -> str: ---> 26 return "SELECT COUNT(*) as COUNT FROM " + kwargs["db_table"] TypeError: can only concatenate str (not "int") to str