Mid-Stream Changing Configuration with Check-Pointed Spark Stream -
i have spark streaming / dstream app this:
// function create , setup new streamingcontext def functiontocreatecontext(): streamingcontext = { val ssc = new streamingcontext(...) // new context val lines = ssc.sockettextstream(...) // create dstreams ... ssc.checkpoint(checkpointdirectory) // set checkpoint directory ssc } // streamingcontext checkpoint data or create new 1 val context = streamingcontext.getorcreate(checkpointdirectory, functiontocreatecontext _) // additional setup on context needs done, // irrespective of whether being started or restarted context. ... // start context context.start() context.awaittermination()
where context uses configuration file can pull items methods appconf.getstring
. use:
val context = streamingcontext.getorcreate( appconf.getstring("spark.checkpointdirectory"), () => createstreamcontext(sparkconf, appconf))
where val sparkconf = new sparkconf()...
.
if stop app , change configuration in app file, these changes not picked unless delete checkpoint directory contents. example, change spark.streaming.kafka.maxrateperpartition
or spark.windowdurationsecs
dynamically. (edit: kill app, change configuration file , restart app.) how can dynamically change these settings or enforce (edited word) configuration change without trashing checkpoint directory (which include checkpoints state info)?
how can dynamically change these settings or enforce configuration change without trashing checkpoint directory?
if dive code streamingcontext.getorcreate
:
def getorcreate( checkpointpath: string, creatingfunc: () => streamingcontext, hadoopconf: configuration = sparkhadooputil.get.conf, createonerror: boolean = false ): streamingcontext = { val checkpointoption = checkpointreader.read( checkpointpath, new sparkconf(), hadoopconf, createonerror) checkpointoption.map(new streamingcontext(null, _, null)).getorelse(creatingfunc()) }
you can see if checkpointreader
has checkpointed data in class path, uses new sparkconf()
parameter, overload doesn't allow passing of custom created sparkconf
. default, sparkconf
load settings declared either environment variable or passed classpath:
class sparkconf(loaddefaults: boolean) extends cloneable logging { import sparkconf._ /** create sparkconf loads defaults system properties , classpath */ def this() = this(true)
so 1 way of achieving want instead of creating sparkconf
object in code, can pass parameters via spark.driver.extraclasspath
, spark.executor.extraclasspath
spark-submit
.
Comments
Post a Comment