Mid-Stream Changing Configuration with Check-Pointed Spark Stream -


i have spark streaming / dstream app this:

// function create , setup new streamingcontext def functiontocreatecontext(): streamingcontext = {   val ssc = new streamingcontext(...)   // new context   val lines = ssc.sockettextstream(...) // create dstreams   ...   ssc.checkpoint(checkpointdirectory)   // set checkpoint directory   ssc }  // streamingcontext checkpoint data or create new 1 val context = streamingcontext.getorcreate(checkpointdirectory, functiontocreatecontext _)  // additional setup on context needs done, // irrespective of whether being started or restarted context. ...  // start context context.start() context.awaittermination() 

where context uses configuration file can pull items methods appconf.getstring. use:

val context = streamingcontext.getorcreate(     appconf.getstring("spark.checkpointdirectory"),      () => createstreamcontext(sparkconf, appconf)) 

where val sparkconf = new sparkconf()....

if stop app , change configuration in app file, these changes not picked unless delete checkpoint directory contents. example, change spark.streaming.kafka.maxrateperpartition or spark.windowdurationsecs dynamically. (edit: kill app, change configuration file , restart app.) how can dynamically change these settings or enforce (edited word) configuration change without trashing checkpoint directory (which include checkpoints state info)?

how can dynamically change these settings or enforce configuration change without trashing checkpoint directory?

if dive code streamingcontext.getorcreate:

def getorcreate(     checkpointpath: string,     creatingfunc: () => streamingcontext,     hadoopconf: configuration = sparkhadooputil.get.conf,     createonerror: boolean = false   ): streamingcontext = {     val checkpointoption = checkpointreader.read(       checkpointpath, new sparkconf(), hadoopconf, createonerror)     checkpointoption.map(new streamingcontext(null, _, null)).getorelse(creatingfunc()) } 

you can see if checkpointreader has checkpointed data in class path, uses new sparkconf() parameter, overload doesn't allow passing of custom created sparkconf. default, sparkconf load settings declared either environment variable or passed classpath:

class sparkconf(loaddefaults: boolean) extends cloneable logging {    import sparkconf._    /** create sparkconf loads defaults system properties , classpath */   def this() = this(true) 

so 1 way of achieving want instead of creating sparkconf object in code, can pass parameters via spark.driver.extraclasspath , spark.executor.extraclasspath spark-submit.


Comments

Popular posts from this blog

javascript - How to get current YouTube IDs via iMacros? -

c# - Maintaining a program folder in program files out of date? -

emulation - Android map show my location didn't work -