public class StreamingContext
extends java.lang.Object
DStream
s from various input sources. It can be either
created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
The associated SparkContext can be accessed using context.sparkContext
. After
creating and transforming DStreams, the streaming computation can be started and stopped
using context.start()
and context.stop()
, respectively.
context.awaitTermination()
allows the current thread to wait for the termination
of the context by stop()
or by an exception.Constructor and Description |
---|
StreamingContext(SparkConf conf,
Duration batchDuration)
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
|
StreamingContext(SparkContext sparkContext,
Duration batchDuration)
Create a StreamingContext using an existing SparkContext.
|
StreamingContext(java.lang.String path)
Recreate a StreamingContext from a checkpoint file.
|
StreamingContext(java.lang.String path,
org.apache.hadoop.conf.Configuration hadoopConf)
Recreate a StreamingContext from a checkpoint file.
|
StreamingContext(java.lang.String path,
SparkContext sparkContext)
Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
|
StreamingContext(java.lang.String master,
java.lang.String appName,
Duration batchDuration,
java.lang.String sparkHome,
scala.collection.Seq<java.lang.String> jars,
scala.collection.Map<java.lang.String,java.lang.String> environment)
Create a StreamingContext by providing the details necessary for creating a new SparkContext.
|
Modifier and Type | Method and Description |
---|---|
void |
addStreamingListener(StreamingListener streamingListener)
Add a
StreamingListener object for
receiving system events related to streaming. |
void |
awaitTermination()
Wait for the execution to stop.
|
boolean |
awaitTerminationOrTimeout(long timeout)
Wait for the execution to stop.
|
DStream<byte[]> |
binaryRecordsStream(java.lang.String directory,
int recordLength)
Create an input stream that monitors a Hadoop-compatible filesystem
for new files and reads them as flat binary files, assuming a fixed length per record,
generating one byte array per record.
|
void |
checkpoint(java.lang.String directory)
Set the context to periodically checkpoint the DStream operations for driver
fault-tolerance.
|
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(java.lang.String directory,
scala.reflect.ClassTag<K> evidence$4,
scala.reflect.ClassTag<V> evidence$5,
scala.reflect.ClassTag<F> evidence$6)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(java.lang.String directory,
scala.Function1<org.apache.hadoop.fs.Path,java.lang.Object> filter,
boolean newFilesOnly,
scala.reflect.ClassTag<K> evidence$7,
scala.reflect.ClassTag<V> evidence$8,
scala.reflect.ClassTag<F> evidence$9)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(java.lang.String directory,
scala.Function1<org.apache.hadoop.fs.Path,java.lang.Object> filter,
boolean newFilesOnly,
org.apache.hadoop.conf.Configuration conf,
scala.reflect.ClassTag<K> evidence$10,
scala.reflect.ClassTag<V> evidence$11,
scala.reflect.ClassTag<F> evidence$12)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
static scala.Option<StreamingContext> |
getActive()
:: Experimental ::
|
static StreamingContext |
getActiveOrCreate(scala.Function0<StreamingContext> creatingFunc)
:: Experimental ::
|
static StreamingContext |
getActiveOrCreate(java.lang.String checkpointPath,
scala.Function0<StreamingContext> creatingFunc,
org.apache.hadoop.conf.Configuration hadoopConf,
boolean createOnError)
:: Experimental ::
|
static StreamingContext |
getOrCreate(java.lang.String checkpointPath,
scala.Function0<StreamingContext> creatingFunc,
org.apache.hadoop.conf.Configuration hadoopConf,
boolean createOnError)
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
|
StreamingContextState |
getState()
:: DeveloperApi ::
|
protected static void |
initializeLogIfNecessary(boolean isInterpreter) |
protected static boolean |
isTraceEnabled() |
static scala.Option<java.lang.String> |
jarOfClass(java.lang.Class<?> cls)
Find the JAR from which a given class was loaded, to make it easy for users to pass
their JARs to StreamingContext.
|
protected static org.slf4j.Logger |
log() |
protected static void |
logDebug(scala.Function0<java.lang.String> msg) |
protected static void |
logDebug(scala.Function0<java.lang.String> msg,
java.lang.Throwable throwable) |
protected static void |
logError(scala.Function0<java.lang.String> msg) |
protected static void |
logError(scala.Function0<java.lang.String> msg,
java.lang.Throwable throwable) |
protected static void |
logInfo(scala.Function0<java.lang.String> msg) |
protected static void |
logInfo(scala.Function0<java.lang.String> msg,
java.lang.Throwable throwable) |
protected static java.lang.String |
logName() |
protected static void |
logTrace(scala.Function0<java.lang.String> msg) |
protected static void |
logTrace(scala.Function0<java.lang.String> msg,
java.lang.Throwable throwable) |
protected static void |
logWarning(scala.Function0<java.lang.String> msg) |
protected static void |
logWarning(scala.Function0<java.lang.String> msg,
java.lang.Throwable throwable) |
<T> InputDStream<T> |
queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
scala.reflect.ClassTag<T> evidence$13)
Create an input stream from a queue of RDDs.
|
<T> InputDStream<T> |
queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
RDD<T> defaultRDD,
scala.reflect.ClassTag<T> evidence$14)
Create an input stream from a queue of RDDs.
|
<T> ReceiverInputDStream<T> |
rawSocketStream(java.lang.String hostname,
int port,
StorageLevel storageLevel,
scala.reflect.ClassTag<T> evidence$3)
Create a input stream from network source hostname:port, where data is received
as serialized blocks (serialized using the Spark's serializer) that can be directly
pushed into the block manager without deserializing them.
|
<T> ReceiverInputDStream<T> |
receiverStream(Receiver<T> receiver,
scala.reflect.ClassTag<T> evidence$1)
Create an input stream with any arbitrary user implemented receiver.
|
void |
remember(Duration duration)
Set each DStream in this context to remember RDDs it generated in the last given duration.
|
<T> ReceiverInputDStream<T> |
socketStream(java.lang.String hostname,
int port,
scala.Function1<java.io.InputStream,scala.collection.Iterator<T>> converter,
StorageLevel storageLevel,
scala.reflect.ClassTag<T> evidence$2)
Creates an input stream from TCP source hostname:port.
|
ReceiverInputDStream<java.lang.String> |
socketTextStream(java.lang.String hostname,
int port,
StorageLevel storageLevel)
Creates an input stream from TCP source hostname:port.
|
SparkContext |
sparkContext()
Return the associated Spark context
|
void |
start()
Start the execution of the streams.
|
void |
stop(boolean stopSparkContext)
Stop the execution of the streams immediately (does not wait for all received data
to be processed).
|
void |
stop(boolean stopSparkContext,
boolean stopGracefully)
Stop the execution of the streams, with option of ensuring all received data
has been processed.
|
DStream<java.lang.String> |
textFileStream(java.lang.String directory)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them as text files (using key as LongWritable, value
as Text and input format as TextInputFormat).
|
<T> DStream<T> |
transform(scala.collection.Seq<DStream<?>> dstreams,
scala.Function2<scala.collection.Seq<RDD<?>>,Time,RDD<T>> transformFunc,
scala.reflect.ClassTag<T> evidence$16)
Create a new DStream in which each RDD is generated by applying a function on RDDs of
the DStreams.
|
<T> DStream<T> |
union(scala.collection.Seq<DStream<T>> streams,
scala.reflect.ClassTag<T> evidence$15)
Create a unified DStream from multiple DStreams of the same type and same slide duration.
|
public StreamingContext(SparkContext sparkContext, Duration batchDuration)
sparkContext
- existing SparkContextbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(SparkConf conf, Duration batchDuration)
conf
- a org.apache.spark.SparkConf object specifying Spark parametersbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(java.lang.String master, java.lang.String appName, Duration batchDuration, java.lang.String sparkHome, scala.collection.Seq<java.lang.String> jars, scala.collection.Map<java.lang.String,java.lang.String> environment)
master
- cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).appName
- a name for your job, to display on the cluster web UIbatchDuration
- the time interval at which streaming data will be divided into batchessparkHome
- (undocumented)jars
- (undocumented)environment
- (undocumented)public StreamingContext(java.lang.String path, org.apache.hadoop.conf.Configuration hadoopConf)
path
- Path to the directory that was specified as the checkpoint directoryhadoopConf
- Optional, configuration object if necessary for reading from
HDFS compatible filesystemspublic StreamingContext(java.lang.String path)
path
- Path to the directory that was specified as the checkpoint directorypublic StreamingContext(java.lang.String path, SparkContext sparkContext)
path
- Path to the directory that was specified as the checkpoint directorysparkContext
- Existing SparkContextpublic static scala.Option<StreamingContext> getActive()
Get the currently active context, if there is one. Active means started but not stopped.
public static StreamingContext getActiveOrCreate(scala.Function0<StreamingContext> creatingFunc)
Either return the "active" StreamingContext (that is, started but not stopped), or create a new StreamingContext that is
creatingFunc
- Function to create a new StreamingContextpublic static StreamingContext getActiveOrCreate(java.lang.String checkpointPath, scala.Function0<StreamingContext> creatingFunc, org.apache.hadoop.conf.Configuration hadoopConf, boolean createOnError)
Either get the currently active StreamingContext (that is, started but not stopped),
OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data
does not exist in the provided, then create a new StreamingContext by calling the provided
creatingFunc
.
checkpointPath
- Checkpoint directory used in an earlier StreamingContext programcreatingFunc
- Function to create a new StreamingContexthadoopConf
- Optional Hadoop configuration if necessary for reading from the
file systemcreateOnError
- Optional, whether to create a new StreamingContext if there is an
error in reading checkpoint data. By default, an exception will be
thrown on error.public static StreamingContext getOrCreate(java.lang.String checkpointPath, scala.Function0<StreamingContext> creatingFunc, org.apache.hadoop.conf.Configuration hadoopConf, boolean createOnError)
checkpointPath
, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the StreamingContext
will be created by called the provided creatingFunc
.
checkpointPath
- Checkpoint directory used in an earlier StreamingContext programcreatingFunc
- Function to create a new StreamingContexthadoopConf
- Optional Hadoop configuration if necessary for reading from the
file systemcreateOnError
- Optional, whether to create a new StreamingContext if there is an
error in reading checkpoint data. By default, an exception will be
thrown on error.public static scala.Option<java.lang.String> jarOfClass(java.lang.Class<?> cls)
cls
- (undocumented)protected static java.lang.String logName()
protected static org.slf4j.Logger log()
protected static void logInfo(scala.Function0<java.lang.String> msg)
protected static void logDebug(scala.Function0<java.lang.String> msg)
protected static void logTrace(scala.Function0<java.lang.String> msg)
protected static void logWarning(scala.Function0<java.lang.String> msg)
protected static void logError(scala.Function0<java.lang.String> msg)
protected static void logInfo(scala.Function0<java.lang.String> msg, java.lang.Throwable throwable)
protected static void logDebug(scala.Function0<java.lang.String> msg, java.lang.Throwable throwable)
protected static void logTrace(scala.Function0<java.lang.String> msg, java.lang.Throwable throwable)
protected static void logWarning(scala.Function0<java.lang.String> msg, java.lang.Throwable throwable)
protected static void logError(scala.Function0<java.lang.String> msg, java.lang.Throwable throwable)
protected static boolean isTraceEnabled()
protected static void initializeLogIfNecessary(boolean isInterpreter)
public SparkContext sparkContext()
public void remember(Duration duration)
duration
- Minimum duration that each DStream should remember its RDDspublic void checkpoint(java.lang.String directory)
directory
- HDFS-compatible directory where the checkpoint data will be reliably stored.
Note that this must be a fault-tolerant file system like HDFS.public <T> ReceiverInputDStream<T> receiverStream(Receiver<T> receiver, scala.reflect.ClassTag<T> evidence$1)
receiver
- Custom implementation of Receiverevidence$1
- (undocumented)public ReceiverInputDStream<java.lang.String> socketTextStream(java.lang.String hostname, int port, StorageLevel storageLevel)
\n
delimited
lines.hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)socketStream
public <T> ReceiverInputDStream<T> socketStream(java.lang.String hostname, int port, scala.Function1<java.io.InputStream,scala.collection.Iterator<T>> converter, StorageLevel storageLevel, scala.reflect.ClassTag<T> evidence$2)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving dataconverter
- Function to convert the byte stream to objectsstorageLevel
- Storage level to use for storing the received objectsevidence$2
- (undocumented)public <T> ReceiverInputDStream<T> rawSocketStream(java.lang.String hostname, int port, StorageLevel storageLevel, scala.reflect.ClassTag<T> evidence$3)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)evidence$3
- (undocumented)public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(java.lang.String directory, scala.reflect.ClassTag<K> evidence$4, scala.reflect.ClassTag<V> evidence$5, scala.reflect.ClassTag<F> evidence$6)
directory
- HDFS directory to monitor for new fileevidence$4
- (undocumented)evidence$5
- (undocumented)evidence$6
- (undocumented)public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(java.lang.String directory, scala.Function1<org.apache.hadoop.fs.Path,java.lang.Object> filter, boolean newFilesOnly, scala.reflect.ClassTag<K> evidence$7, scala.reflect.ClassTag<V> evidence$8, scala.reflect.ClassTag<F> evidence$9)
directory
- HDFS directory to monitor for new filefilter
- Function to filter paths to processnewFilesOnly
- Should process only new files and ignore existing files in the directoryevidence$7
- (undocumented)evidence$8
- (undocumented)evidence$9
- (undocumented)public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(java.lang.String directory, scala.Function1<org.apache.hadoop.fs.Path,java.lang.Object> filter, boolean newFilesOnly, org.apache.hadoop.conf.Configuration conf, scala.reflect.ClassTag<K> evidence$10, scala.reflect.ClassTag<V> evidence$11, scala.reflect.ClassTag<F> evidence$12)
directory
- HDFS directory to monitor for new filefilter
- Function to filter paths to processnewFilesOnly
- Should process only new files and ignore existing files in the directoryconf
- Hadoop configurationevidence$10
- (undocumented)evidence$11
- (undocumented)evidence$12
- (undocumented)public DStream<java.lang.String> textFileStream(java.lang.String directory)
directory
- HDFS directory to monitor for new filepublic DStream<byte[]> binaryRecordsStream(java.lang.String directory, int recordLength)
'''Note:''' We ensure that the byte array for each record in the resulting RDDs of the DStream has the provided record length.
directory
- HDFS directory to monitor for new filerecordLength
- length of each record in bytespublic <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, boolean oneAtATime, scala.reflect.ClassTag<T> evidence$13)
NOTE: Arbitrary RDDs can be added to queueStream
, there is no way to recover data of
those RDDs, so queueStream
doesn't support checkpointing.
queue
- Queue of RDDs. Modifications to this data structure must be synchronized.oneAtATime
- Whether only one RDD should be consumed from the queue in every intervalevidence$13
- (undocumented)public <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, boolean oneAtATime, RDD<T> defaultRDD, scala.reflect.ClassTag<T> evidence$14)
NOTE: Arbitrary RDDs can be added to queueStream
, there is no way to recover data of
those RDDs, so queueStream
doesn't support checkpointing.
queue
- Queue of RDDs. Modifications to this data structure must be synchronized.oneAtATime
- Whether only one RDD should be consumed from the queue in every intervaldefaultRDD
- Default RDD is returned by the DStream when the queue is empty.
Set as null if no RDD should be returned when emptyevidence$14
- (undocumented)public <T> DStream<T> union(scala.collection.Seq<DStream<T>> streams, scala.reflect.ClassTag<T> evidence$15)
streams
- (undocumented)evidence$15
- (undocumented)public <T> DStream<T> transform(scala.collection.Seq<DStream<?>> dstreams, scala.Function2<scala.collection.Seq<RDD<?>>,Time,RDD<T>> transformFunc, scala.reflect.ClassTag<T> evidence$16)
dstreams
- (undocumented)transformFunc
- (undocumented)evidence$16
- (undocumented)public void addStreamingListener(StreamingListener streamingListener)
StreamingListener
object for
receiving system events related to streaming.streamingListener
- (undocumented)public StreamingContextState getState()
Return the current state of the context. The context can be in three possible states -
- StreamingContextState.INITIALIZED - The context has been created, but not started yet. Input DStreams, transformations and output operations can be created on the context. - StreamingContextState.ACTIVE - The context has been started, and not stopped. Input DStreams, transformations and output operations cannot be created on the context. - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
public void start()
java.lang.IllegalStateException
- if the StreamingContext is already stopped.public void awaitTermination()
public boolean awaitTerminationOrTimeout(long timeout)
timeout
- time to wait in millisecondstrue
if it's stopped; or throw the reported error during the execution; or false
if the waiting time elapsed before returning from the method.public void stop(boolean stopSparkContext)
stopSparkContext
is not specified, the underlying
SparkContext will also be stopped. This implicit behavior can be configured using the
SparkConf configuration spark.streaming.stopSparkContextByDefault.
stopSparkContext
- If true, stops the associated SparkContext. The underlying SparkContext
will be stopped regardless of whether this StreamingContext has been
started.public void stop(boolean stopSparkContext, boolean stopGracefully)
stopSparkContext
- if true, stops the associated SparkContext. The underlying SparkContext
will be stopped regardless of whether this StreamingContext has been
started.stopGracefully
- if true, stops gracefully by waiting for the processing of all
received data to be completed