博客
关于我
flink分析使用之五工作图的生成和分发
阅读量:306 次
发布时间:2019-03-04

本文共 66331 字,大约阅读时间需要 221 分钟。

一、JobGraph

在Flink中的有以下几种图,即StreamGraph,它用来生成JobGraph,然后再由分发器将其分发生成ExecutionGraph并进而形成Task任务执行的任务图(这个图就只是一个逻辑概念了)。JobGraph是非常重要的一环,其它的图以后再详细分析。学过图的都知道,图和其它数据结构明显不同的是,图有节点和边的概念。

那么看一下Flink中这个图的定义:

public class JobGraph implements Serializable {   	private static final long serialVersionUID = 1L;	// --- job and configuration ---	/** List of task vertices included in this job graph. */	private final Map
taskVertices = new LinkedHashMap
(); /** The job configuration attached to this job. */ private final Configuration jobConfiguration = new Configuration(); /** ID of this job. May be set if specific job id is desired (e.g. session management) */ private final JobID jobID; /** Name of this job. */ private final String jobName; /** The number of seconds after which the corresponding ExecutionGraph is removed at the * job manager after it has been executed. */ private long sessionTimeout = 0; /** flag to enable queued scheduling */ private boolean allowQueuedScheduling; /** The mode in which the job is scheduled */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; // --- checkpointing --- /** Job specific execution config */ private SerializedValue
serializedExecutionConfig; /** The settings for the job checkpoints */ private JobCheckpointingSettings snapshotSettings; /** Savepoint restore settings. */ private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); // --- attached resources --- /** Set of JAR files required to run this job. */ private final List
userJars = new ArrayList
(); /** Set of custom files required to run this job. */ private final Map
userArtifacts = new HashMap<>(); /** Set of blob keys identifying the JAR files required to run this job. */ private final List
userJarBlobKeys = new ArrayList<>(); /** List of classpaths required to run this job. */ private List
classpaths = Collections.emptyList(); // -------------------------------------------------------------------------------------------- /** * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, * and a random job ID. The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobName The name of the job. */ public JobGraph(String jobName) { this(null, jobName); } /** * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), * the given name and the given execution configuration (see {@link ExecutionConfig}). * The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. */ public JobGraph(JobID jobId, String jobName) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; try { setExecutionConfig(new ExecutionConfig()); } catch (IOException e) { // this should never happen, since an empty execution config is always serializable throw new RuntimeException("bug, empty execution config is not serializable"); } } /** * Constructs a new job graph with no name, a random job ID, the given {@link ExecutionConfig}, and * the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards. * * @param vertices The vertices to add to the graph. */ public JobGraph(JobVertex... vertices) { this(null, vertices); } /** * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, a random job ID, * and the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobName The name of the job. * @param vertices The vertices to add to the graph. */ public JobGraph(String jobName, JobVertex... vertices) { this(null, jobName, vertices); } /** * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, * the given jobId or a random one if null supplied, and the given job vertices. * The ExecutionConfig will be serialized and can't be modified afterwards. * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. * @param vertices The vertices to add to the graph. */ public JobGraph(JobID jobId, String jobName, JobVertex... vertices) { this(jobId, jobName); for (JobVertex vertex : vertices) { addVertex(vertex); } }...... /** * Sets the savepoint restore settings. * @param settings The savepoint restore settings. */ public void setSavepointRestoreSettings(SavepointRestoreSettings settings) { this.savepointRestoreSettings = checkNotNull(settings, "Savepoint restore settings"); } /** * Returns the configured savepoint restore setting. * @return The configured savepoint restore settings. */ public SavepointRestoreSettings getSavepointRestoreSettings() { return savepointRestoreSettings; } /** * Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC * transport. Further modification of the referenced ExecutionConfig object will not affect * this serialized copy. * * @param executionConfig The ExecutionConfig to be serialized. * @throws IOException Thrown if the serialization of the ExecutionConfig fails */ public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException { checkNotNull(executionConfig, "ExecutionConfig must not be null."); this.serializedExecutionConfig = new SerializedValue<>(executionConfig); } /** * Adds a new task vertex to the job graph if it is not already included. * * @param vertex * the new task vertex to be added */ public void addVertex(JobVertex vertex) { final JobVertexID id = vertex.getID(); JobVertex previous = taskVertices.put(id, vertex); // if we had a prior association, restore and throw an exception if (previous != null) { taskVertices.put(id, previous); throw new IllegalArgumentException("The JobGraph already contains a vertex with that id."); } } /** * Returns an Iterable to iterate all vertices registered with the job graph. * * @return an Iterable to iterate all vertices registered with the job graph */ public Iterable
getVertices() { return this.taskVertices.values(); } /** * Returns an array of all job vertices that are registered with the job graph. The order in which the vertices * appear in the list is not defined. * * @return an array of all job vertices that are registered with the job graph */ public JobVertex[] getVerticesAsArray() { return this.taskVertices.values().toArray(new JobVertex[this.taskVertices.size()]); } /** * Returns the number of all vertices. * * @return The number of all vertices. */ public int getNumberOfVertices() { return this.taskVertices.size(); } /** * Sets the settings for asynchronous snapshots. A value of {@code null} means that * snapshotting is not enabled. * * @param settings The snapshot settings */ public void setSnapshotSettings(JobCheckpointingSettings settings) { this.snapshotSettings = settings; } /** * Gets the settings for asynchronous snapshots. This method returns null, when * checkpointing is not enabled. * * @return The snapshot settings */ public JobCheckpointingSettings getCheckpointingSettings() { return snapshotSettings; } /** * Checks if the checkpointing was enabled for this job graph * * @return true if checkpointing enabled */ public boolean isCheckpointingEnabled() { if (snapshotSettings == null) { return false; } long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval(); return checkpointInterval > 0 && checkpointInterval < Long.MAX_VALUE; } /** * Searches for a vertex with a matching ID and returns it. * * @param id * the ID of the vertex to search for * @return the vertex with the matching ID or
null if no vertex with such ID could be found */ public JobVertex findVertexByID(JobVertexID id) { return this.taskVertices.get(id); } /** * Sets the classpaths required to run the job on a task manager. * * @param paths paths of the directories/JAR files required to run the job on a task manager */ public void setClasspaths(List
paths) { classpaths = paths; } public List
getClasspaths() { return classpaths; } /** * Gets the maximum parallelism of all operations in this job graph. * * @return The maximum parallelism of this job graph */ public int getMaximumParallelism() { int maxParallelism = -1; for (JobVertex vertex : taskVertices.values()) { maxParallelism = Math.max(vertex.getParallelism(), maxParallelism); } return maxParallelism; } // -------------------------------------------------------------------------------------------- // Topological Graph Access // -------------------------------------------------------------------------------------------- public List
getVerticesSortedTopologicallyFromSources() throws InvalidProgramException { // early out on empty lists if (this.taskVertices.isEmpty()) { return Collections.emptyList(); } List
sorted = new ArrayList
(this.taskVertices.size()); Set
remaining = new LinkedHashSet
(this.taskVertices.values()); // start by finding the vertices with no input edges // and the ones with disconnected inputs (that refer to some standalone data set) { Iterator
iter = remaining.iterator(); while (iter.hasNext()) { JobVertex vertex = iter.next(); if (vertex.hasNoConnectedInputs()) { sorted.add(vertex); iter.remove(); } } } int startNodePos = 0; // traverse from the nodes that were added until we found all elements while (!remaining.isEmpty()) { // first check if we have more candidates to start traversing from. if not, then the // graph is cyclic, which is not permitted if (startNodePos >= sorted.size()) { throw new InvalidProgramException("The job graph is cyclic."); } JobVertex current = sorted.get(startNodePos++); addNodesThatHaveNoNewPredecessors(current, sorted, remaining); } return sorted; } private void addNodesThatHaveNoNewPredecessors(JobVertex start, List
target, Set
remaining) { // forward traverse over all produced data sets and all their consumers for (IntermediateDataSet dataSet : start.getProducedDataSets()) { for (JobEdge edge : dataSet.getConsumers()) { // a vertex can be added, if it has no predecessors that are still in the 'remaining' set JobVertex v = edge.getTarget(); if (!remaining.contains(v)) { continue; } boolean hasNewPredecessors = false; for (JobEdge e : v.getInputs()) { // skip the edge through which we came if (e == edge) { continue; } IntermediateDataSet source = e.getSource(); if (remaining.contains(source.getProducer())) { hasNewPredecessors = true; break; } } if (!hasNewPredecessors) { target.add(v); remaining.remove(v); addNodesThatHaveNoNewPredecessors(v, target, remaining); } } } } // -------------------------------------------------------------------------------------------- // Handling of attached JAR files // -------------------------------------------------------------------------------------------- /** * Adds the path of a JAR file required to run the job on a task manager. * * @param jar * path of the JAR file required to run the job on a task manager */ public void addJar(Path jar) { if (jar == null) { throw new IllegalArgumentException(); } if (!userJars.contains(jar)) { userJars.add(jar); } } /** * Gets the list of assigned user jar paths. * * @return The list of assigned user jar paths */ public List
getUserJars() { return userJars; } /** * Adds the path of a custom file required to run the job on a task manager. * * @param name a name under which this artifact will be accessible through {@link DistributedCache} * @param file path of a custom file required to run the job on a task manager */ public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) { if (file == null) { throw new IllegalArgumentException(); } userArtifacts.putIfAbsent(name, file); } /** * Gets the list of assigned user jar paths. * * @return The list of assigned user jar paths */ public Map
getUserArtifacts() { return userArtifacts; } /** * Adds the BLOB referenced by the key to the JobGraph's dependencies. * * @param key * path of the JAR file required to run the job on a task manager */ public void addUserJarBlobKey(PermanentBlobKey key) { if (key == null) { throw new IllegalArgumentException(); } if (!userJarBlobKeys.contains(key)) { userJarBlobKeys.add(key); } } /** * Checks whether the JobGraph has user code JAR files attached. * * @return True, if the JobGraph has user code JAR files attached, false otherwise. */ public boolean hasUsercodeJarFiles() { return this.userJars.size() > 0; } /** * Returns a set of BLOB keys referring to the JAR files required to run this job. * * @return set of BLOB keys referring to the JAR files required to run this job */ public List
getUserJarBlobKeys() { return this.userJarBlobKeys; } @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) throws IOException { byte[] serializedBlobKey; serializedBlobKey = InstantiationUtil.serializeObject(blobKey); userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( originalEntry.filePath, originalEntry.isExecutable, serializedBlobKey, originalEntry.isZipped )); } public void writeUserArtifactEntriesToConfiguration() { for (Map.Entry
userArtifact : userArtifacts.entrySet()) { DistributedCache.writeFileInfoToConfig( userArtifact.getKey(), userArtifact.getValue(), jobConfiguration ); } }}

作业图是是以DAG(有向无环图)来组织的,是通过顶点和中间态的连接形成的,来看一下这个类的定义中Map<JobVertexID, JobVertex>类型的映射变量,ScheduleMode调度模式和检查点(快照),而在函数addNodesThatHaveNoNewPredecessors中有JobEdge(做为JobVertex的数据输入通道,也就是图的连接边),那看一下JobVertex的定义:

/** * The base class for job vertexes. */public class JobVertex implements java.io.Serializable {   	private static final long serialVersionUID = 1L;	private static final String DEFAULT_NAME = "(unnamed vertex)";	// --------------------------------------------------------------------------------------------	// Members that define the structure / topology of the graph	// --------------------------------------------------------------------------------------------	/** The ID of the vertex. */	private final JobVertexID id;	/** The alternative IDs of the vertex. */	private final ArrayList
idAlternatives = new ArrayList<>(); /** The IDs of all operators contained in this vertex. */ private final ArrayList
operatorIDs = new ArrayList<>(); /** The alternative IDs of all operators contained in this vertex. */ private final ArrayList
operatorIdsAlternatives = new ArrayList<>(); /** List of produced data sets, one per writer */ private final ArrayList
results = new ArrayList
(); /** List of edges with incoming data. One per Reader. */ private final ArrayList
inputs = new ArrayList
(); /** Number of subtasks to split this task into at runtime.*/ private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; /** Maximum number of subtasks to split this task into a runtime. */ private int maxParallelism = -1; /** The minimum resource of the vertex */ private ResourceSpec minResources = ResourceSpec.DEFAULT; /** The preferred resource of the vertex */ private ResourceSpec preferredResources = ResourceSpec.DEFAULT; /** Custom configuration passed to the assigned task at runtime. */ private Configuration configuration; /** The class of the invokable. */ private String invokableClassName; /** Indicates of this job vertex is stoppable or not. */ private boolean isStoppable = false; /** Optionally, a source of input splits */ private InputSplitSource
inputSplitSource; /** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */ private String name; /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */ private SlotSharingGroup slotSharingGroup; /** The group inside which the vertex subtasks share slots */ private CoLocationGroup coLocationGroup; /** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan */ private String operatorName; /** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce', * to be included in the JSON plan */ private String operatorDescription; /** Optional, pretty name of the operator, to be displayed in the JSON plan */ private String operatorPrettyName; /** Optional, the JSON for the optimizer properties of the operator result, * to be included in the JSON plan */ private String resultOptimizerProperties; /** The input dependency constraint to schedule this vertex. */ private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; // -------------------------------------------------------------------------------------------- /** * Constructs a new job vertex and assigns it with the given name. * * @param name The name of the new job vertex. */ public JobVertex(String name) { this(name, null); } /** * Constructs a new job vertex and assigns it with the given name. * * @param name The name of the new job vertex. * @param id The id of the job vertex. */ public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; // the id lists must have the same size this.operatorIDs.add(OperatorID.fromJobVertexID(this.id)); this.operatorIdsAlternatives.add(null); } /** * Constructs a new job vertex and assigns it with the given name. * * @param name The name of the new job vertex. * @param primaryId The id of the job vertex. * @param alternativeIds The alternative ids of the job vertex. * @param operatorIds The ids of all operators contained in this job vertex. * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ public JobVertex(String name, JobVertexID primaryId, List
alternativeIds, List
operatorIds, List
alternativeOperatorIds) { Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size()); this.name = name == null ? DEFAULT_NAME : name; this.id = primaryId == null ? new JobVertexID() : primaryId; this.idAlternatives.addAll(alternativeIds); this.operatorIDs.addAll(operatorIds); this.operatorIdsAlternatives.addAll(alternativeOperatorIds); }...... /** * Returns the number of inputs. * * @return The number of inputs. */ public int getNumberOfInputs() { return this.inputs.size(); } public List
getOperatorIDs() { return operatorIDs; } public List
getUserDefinedOperatorIDs() { return operatorIdsAlternatives; } /** * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime. * * @return the vertex's configuration object */ public Configuration getConfiguration() { if (this.configuration == null) { this.configuration = new Configuration(); } return this.configuration; } public void setInvokableClass(Class
invokable) { Preconditions.checkNotNull(invokable); this.invokableClassName = invokable.getName(); this.isStoppable = StoppableTask.class.isAssignableFrom(invokable); } /** * Returns the name of the invokable class which represents the task of this vertex. * * @return The name of the invokable class,
null if not set. */ public String getInvokableClassName() { return this.invokableClassName; } /** * Returns the invokable class which represents the task of this vertex * * @param cl The classloader used to resolve user-defined classes * @return The invokable class,
null if it is not set */ public Class
getInvokableClass(ClassLoader cl) { if (cl == null) { throw new NullPointerException("The classloader must not be null."); } if (invokableClassName == null) { return null; } try { return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class); } catch (ClassNotFoundException e) { throw new RuntimeException("The user-code class could not be resolved.", e); } catch (ClassCastException e) { throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e); } } /** * Gets the parallelism of the task. * * @return The parallelism of the task. */ public int getParallelism() { return parallelism; } /** * Sets the parallelism for the task. * * @param parallelism The parallelism for the task. */ public void setParallelism(int parallelism) { if (parallelism < 1) { throw new IllegalArgumentException("The parallelism must be at least one."); } this.parallelism = parallelism; }...... /** * Sets the minimum and preferred resources for the task. * * @param minResources The minimum resource for the task. * @param preferredResources The preferred resource for the task. */ public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) { this.minResources = checkNotNull(minResources); this.preferredResources = checkNotNull(preferredResources); } public InputSplitSource
getInputSplitSource() { return inputSplitSource; } public void setInputSplitSource(InputSplitSource
inputSplitSource) { this.inputSplitSource = inputSplitSource; } public List
getProducedDataSets() { return this.results; } public List
getInputs() { return this.inputs; } /** * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same * slot sharing group can run one subtask each in the same slot. * * @param grp The slot sharing group to associate the vertex with. */ public void setSlotSharingGroup(SlotSharingGroup grp) { if (this.slotSharingGroup != null) { this.slotSharingGroup.removeVertexFromGroup(id); } this.slotSharingGroup = grp; if (grp != null) { grp.addVertexToGroup(id); } } /** * Gets the slot sharing group that this vertex is associated with. Different vertices in the same * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with * a slot sharing group, this method returns {@code null}. * * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one. */ public SlotSharingGroup getSlotSharingGroup() { return slotSharingGroup; }...... public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) { if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) { throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group."); } CoLocationGroup thisGroup = this.coLocationGroup; CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup; if (otherGroup == null) { if (thisGroup == null) { CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith); this.coLocationGroup = group; strictlyCoLocatedWith.coLocationGroup = group; } else { thisGroup.addVertex(strictlyCoLocatedWith); strictlyCoLocatedWith.coLocationGroup = thisGroup; } } else { if (thisGroup == null) { otherGroup.addVertex(this); this.coLocationGroup = otherGroup; } else { // both had yet distinct groups, we need to merge them thisGroup.mergeInto(otherGroup); } } } public CoLocationGroup getCoLocationGroup() { return coLocationGroup; } public void updateCoLocationGroup(CoLocationGroup group) { this.coLocationGroup = group; } // -------------------------------------------------------------------------------------------- public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) { return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType); } public IntermediateDataSet createAndAddResultDataSet( IntermediateDataSetID id, ResultPartitionType partitionType) { IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this); this.results.add(result); return result; } public JobEdge connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) { JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); dataSet.addConsumer(edge); return edge; } public JobEdge connectNewDataSetAsInput( JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) { IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); dataSet.addConsumer(edge); return edge; } public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) { JobEdge edge = new JobEdge(dataSetId, this, distPattern); this.inputs.add(edge); } // -------------------------------------------------------------------------------------------- //能过下面两个函数,可以看出此数据结构的输入和输出分别为JobEdge和IntermediateDataSet public boolean isInputVertex() { return this.inputs.isEmpty(); } public boolean isOutputVertex() { return this.results.isEmpty(); } public boolean hasNoConnectedInputs() { for (JobEdge edge : inputs) { if (!edge.isIdReference()) { return false; } } return true; }......}

通过分析代码可以看出,这个图的节点的输入输出分别为:JobEdge和IntermediateDataSet两个数据结构。内部还有OperatorID的数据结构,用来管理对数据流的操作形态。涉及到Slot的SlotSharingGroup和CoLocationGroup。再看一下JobEdge和IntermediateDataSet两个数据结构的定义:

public class JobEdge implements java.io.Serializable {   	private static final long serialVersionUID = 1L;	/** The vertex connected to this edge. */	private final JobVertex target;	/** The distribution pattern that should be used for this job edge. */	private final DistributionPattern distributionPattern;	/** The data set at the source of the edge, may be null if the edge is not yet connected*/	private IntermediateDataSet source;	/** The id of the source intermediate data set */	private IntermediateDataSetID sourceId;	/** Optional name for the data shipping strategy (forward, partition hash, rebalance, ...),	 * to be displayed in the JSON plan */	private String shipStrategyName;	/** Optional name for the pre-processing operation (sort, combining sort, ...),	 * to be displayed in the JSON plan */	private String preProcessingOperationName;	/** Optional description of the caching inside an operator, to be displayed in the JSON plan */	private String operatorLevelCachingDescription;	/**	 * Constructs a new job edge, that connects an intermediate result to a consumer task.	 *	 * @param source The data set that is at the source of this edge.	 * @param target The operation that is at the target of this edge.	 * @param distributionPattern The pattern that defines how the connection behaves in parallel.	 */	public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern) {   		if (source == null || target == null || distributionPattern == null) {   			throw new NullPointerException();		}		this.target = target;		this.distributionPattern = distributionPattern;		this.source = source;		this.sourceId = source.getId();	}	/**	 * Constructs a new job edge that refers to an intermediate result via the Id, rather than directly through	 * the intermediate data set structure.	 *	 * @param sourceId The id of the data set that is at the source of this edge.	 * @param target The operation that is at the target of this edge.	 * @param distributionPattern The pattern that defines how the connection behaves in parallel.	 */	public JobEdge(IntermediateDataSetID sourceId, JobVertex target, DistributionPattern distributionPattern) {   		if (sourceId == null || target == null || distributionPattern == null) {   			throw new NullPointerException();		}		this.target = target;		this.distributionPattern = distributionPattern;		this.sourceId = sourceId;	}	/**	 * Returns the data set at the source of the edge. May be null, if the edge refers to the source via an ID	 * and has not been connected.	 *	 * @return The data set at the source of the edge	 */	public IntermediateDataSet getSource() {   		return source;	}	/**	 * Returns the vertex connected to this edge.	 *	 * @return The vertex connected to this edge.	 */	public JobVertex getTarget() {   		return target;	}	/**	 * Returns the distribution pattern used for this edge.	 *	 * @return The distribution pattern used for this edge.	 */	public DistributionPattern getDistributionPattern(){   		return this.distributionPattern;	}	/**	 * Gets the ID of the consumed data set.	 *	 * @return The ID of the consumed data set.	 */	public IntermediateDataSetID getSourceId() {   		return sourceId;	}	public boolean isIdReference() {   		return this.source == null;	}	// --------------------------------------------------------------------------------------------	public void connecDataSet(IntermediateDataSet dataSet) {   		if (dataSet == null) {   			throw new NullPointerException();		}		if (this.source != null) {   			throw new IllegalStateException("The edge is already connected.");		}		if (!dataSet.getId().equals(sourceId)) {   			throw new IllegalArgumentException("The data set to connect does not match the sourceId.");		}		this.source = dataSet;	}......}public class IntermediateDataSet implements java.io.Serializable {   	private static final long serialVersionUID = 1L;	private final IntermediateDataSetID id; 		// the identifier	private final JobVertex producer;			// the operation that produced this data set	private final List
consumers = new ArrayList
(); // The type of partition to use at runtime private final ResultPartitionType resultType; // -------------------------------------------------------------------------------------------- public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) { this.id = checkNotNull(id); this.producer = checkNotNull(producer); this.resultType = checkNotNull(resultType); } // -------------------------------------------------------------------------------------------- public IntermediateDataSetID getId() { return id; } public JobVertex getProducer() { return producer; } public List
getConsumers() { return this.consumers; } public ResultPartitionType getResultType() { return resultType; } // -------------------------------------------------------------------------------------------- public void addConsumer(JobEdge edge) { this.consumers.add(edge); } // -------------------------------------------------------------------------------------------- @Override public String toString() { return "Intermediate Data Set (" + id + ")"; }}

是不是觉得这几个类都是互相勾连的,而且它们都继承了Serializable这个串行化的类,说明它们都是对数据进行操作的。不然继承这个玩意儿有啥意义。

二、分发器

在Standalone的模式下,看一下,分发器的创建:

创建的顺序:StandaloneSessionClusterEntrypoint->SessionDispatcherResourceManagerComponentFactory->createDispatcherResourceManagerComponent->createDispatcher(StandaloneDispatcher)

public class StandaloneDispatcher extends Dispatcher {   	public StandaloneDispatcher(			RpcService rpcService,			String endpointId,			Configuration configuration,			HighAvailabilityServices highAvailabilityServices,			GatewayRetriever
resourceManagerGatewayRetriever, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, HistoryServerArchivist historyServerArchivist) throws Exception { super( rpcService, endpointId, configuration, highAvailabilityServices, highAvailabilityServices.getSubmittedJobGraphStore(),//注意这个函数 resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, historyServerArchivist); }}

getSubmittedJobGraphStore可以用来拿到实现JobGraph的实例的句柄,看一下这个接口的定义:

public interface SubmittedJobGraphStore {   	/**	 * Starts the {@link SubmittedJobGraphStore} service.	 */	void start(SubmittedJobGraphListener jobGraphListener) throws Exception;	/**	 * Stops the {@link SubmittedJobGraphStore} service.	 */	void stop() throws Exception;	/**	 * Returns the {@link SubmittedJobGraph} with the given {@link JobID} or	 * {@code null} if no job was registered.	 */	@Nullable	SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;	/**	 * Adds the {@link SubmittedJobGraph} instance.	 *	 * 

If a job graph with the same {@link JobID} exists, it is replaced. */ void putJobGraph(SubmittedJobGraph jobGraph) throws Exception; /** * Removes the {@link SubmittedJobGraph} with the given {@link JobID} if it exists. */ void removeJobGraph(JobID jobId) throws Exception; /** * Releases the locks on the specified {@link JobGraph}. * * Releasing the locks allows that another instance can delete the job from * the {@link SubmittedJobGraphStore}. * * @param jobId specifying the job to release the locks for * @throws Exception if the locks cannot be released */ void releaseJobGraph(JobID jobId) throws Exception; /** * Get all job ids of submitted job graphs to the submitted job graph store. * * @return Collection of submitted job ids * @throws Exception if the operation fails */ Collection

getJobIds() throws Exception; /** * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers). */ interface SubmittedJobGraphListener { /** * Callback for {@link SubmittedJobGraph} instances added by a different {@link * SubmittedJobGraphStore} instance. * *

Important: It is possible to get false positives and be notified * about a job graph, which was added by this instance. * * @param jobId The {@link JobID} of the added job graph */ void onAddedJobGraph(JobID jobId); /** * Callback for {@link SubmittedJobGraph} instances removed by a different {@link * SubmittedJobGraphStore} instance. * * @param jobId The {@link JobID} of the removed job graph */ void onRemovedJobGraph(JobID jobId); }}

也就是在前面分析HA服务创建的时候儿,会创建这些个服务,这些服务中会有一个启动对JobGraph的管理的任务,通过它拿到作业图的控制权。

三、分发工作图

那么工作图是如何分发的呢?这里可以看看所有的Dispatcher的抽象类的定义:

public abstract class Dispatcher extends FencedRpcEndpoint
implements DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener { public static final String DISPATCHER_NAME = "dispatcher"; private final Configuration configuration; private final SubmittedJobGraphStore submittedJobGraphStore; private final RunningJobsRegistry runningJobsRegistry; private final HighAvailabilityServices highAvailabilityServices; private final GatewayRetriever
resourceManagerGatewayRetriever; private final JobManagerSharedServices jobManagerSharedServices; private final HeartbeatServices heartbeatServices; private final BlobServer blobServer; private final FatalErrorHandler fatalErrorHandler; private final Map
> jobManagerRunnerFutures; private final LeaderElectionService leaderElectionService; private final ArchivedExecutionGraphStore archivedExecutionGraphStore; private final JobManagerRunnerFactory jobManagerRunnerFactory; private final JobManagerMetricGroup jobManagerMetricGroup; private final HistoryServerArchivist historyServerArchivist; @Nullable private final String metricQueryServicePath; private final Map
> jobManagerTerminationFutures; private CompletableFuture
recoveryOperation = CompletableFuture.completedFuture(null); public Dispatcher( RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, GatewayRetriever
resourceManagerGatewayRetriever, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, HistoryServerArchivist historyServerArchivist) throws Exception { super(rpcService, endpointId); this.configuration = Preconditions.checkNotNull(configuration); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever); this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); this.blobServer = Preconditions.checkNotNull(blobServer); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore); this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup); this.metricQueryServicePath = metricServiceQueryPath; this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( configuration, this.blobServer); this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); jobManagerRunnerFutures = new HashMap<>(16); leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService(); this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist); this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore); this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory); this.jobManagerTerminationFutures = new HashMap<>(2); } //------------------------------------------------------ // Lifecycle methods //------------------------------------------------------ @Override public void onStart() throws Exception { try { startDispatcherServices(); } catch (Exception e) { final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e); onFatalError(exception); throw exception; } } private void startDispatcherServices() throws Exception { try { submittedJobGraphStore.start(this); leaderElectionService.start(this); registerDispatcherMetrics(jobManagerMetricGroup); } catch (Exception e) { handleStartDispatcherServicesException(e); } } private void handleStartDispatcherServicesException(Exception e) throws Exception { try { stopDispatcherServices(); } catch (Exception exception) { e.addSuppressed(exception); } throw e; } @Override public CompletableFuture
onStop() { log.info("Stopping dispatcher {}.", getAddress()); final CompletableFuture
allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture(); return FutureUtils.runAfterwards( allJobManagerRunnersTerminationFuture, () -> { stopDispatcherServices(); log.info("Stopped dispatcher {}.", getAddress()); }); } private void stopDispatcherServices() throws Exception { Exception exception = null; try { jobManagerSharedServices.shutdown(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } try { submittedJobGraphStore.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } try { leaderElectionService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } jobManagerMetricGroup.close(); ExceptionUtils.tryRethrowException(exception); } //------------------------------------------------------ // RPCs //------------------------------------------------------ @Override public CompletableFuture
submitJob(JobGraph jobGraph, Time timeout) { log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName()); try { if (isDuplicateJob(jobGraph.getJobID())) { return FutureUtils.completedExceptionally( new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted.")); } else { return internalSubmitJob(jobGraph); } } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } } /** * Checks whether the given job has already been submitted or executed. * * @param jobId identifying the submitted job * @return true if the job has already been submitted (is running) or has been executed * @throws FlinkException if the job scheduling status cannot be retrieved */ private boolean isDuplicateJob(JobID jobId) throws FlinkException { final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus; try { jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); } catch (IOException e) { throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e); } return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId); } private CompletableFuture
internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); final CompletableFuture
persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .thenApply(ignored -> Acknowledge.get()); return persistAndRunFuture.handleAsync((acknowledge, throwable) -> { if (throwable != null) { cleanUpJobData(jobGraph.getJobID(), true); final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable); throw new CompletionException( new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable)); } else { return acknowledge; } }, getRpcService().getExecutor()); } private CompletableFuture
persistAndRunJob(JobGraph jobGraph) throws Exception { submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph)); final CompletableFuture
runJobFuture = runJob(jobGraph); return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> { if (throwable != null) { submittedJobGraphStore.removeJobGraph(jobGraph.getJobID()); } })); } private CompletableFuture
runJob(JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID())); final CompletableFuture
jobManagerRunnerFuture = createJobManagerRunner(jobGraph); jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); return jobManagerRunnerFuture .thenApply(FunctionUtils.nullFn()) .whenCompleteAsync( (ignored, throwable) -> { if (throwable != null) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); } private CompletableFuture
createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService(); final CompletableFuture
jobManagerRunnerFuture = CompletableFuture.supplyAsync( CheckedSupplier.unchecked(() -> jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)), rpcService.getExecutor()); return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)); } private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobGraph().getJobID(); jobManagerRunner.getResultFuture().whenCompleteAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { // check if we are still the active JobManagerRunner by checking the identity //noinspection ObjectEquality if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}.", jobId); } }, getMainThreadExecutor()); jobManagerRunner.start(); return jobManagerRunner; } @Override public CompletableFuture
> listJobs(Time timeout) { return CompletableFuture.completedFuture( Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet()))); } @Override public CompletableFuture
disposeSavepoint(String savepointPath, Time timeout) { final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); return CompletableFuture.supplyAsync( () -> { log.info("Disposing savepoint {}.", savepointPath); try { Checkpoints.disposeSavepoint(savepointPath, configuration, classLoader, log); } catch (IOException | FlinkException e) { throw new CompletionException(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), e)); } return Acknowledge.get(); }, jobManagerSharedServices.getScheduledExecutorService()); } @Override public CompletableFuture
cancelJob(JobID jobId, Time timeout) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout)); } @Override public CompletableFuture
stopJob(JobID jobId, Time timeout) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.stop(timeout)); } @Override public CompletableFuture
rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); return jobMasterGatewayFuture.thenCompose( (JobMasterGateway jobMasterGateway) -> jobMasterGateway.rescaleJob(newParallelism, rescalingBehaviour, timeout)); } @Override public CompletableFuture
requestClusterOverview(Time timeout) { CompletableFuture
taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout)); final List
>> optionalJobInformation = queryJobMastersForInformation( (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout)); CompletableFuture
>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation); CompletableFuture
> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection); final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview(); return allJobsFuture.thenCombine( taskManagerOverviewFuture, (Collection
runningJobsStatus, ResourceOverview resourceOverview) -> { final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview); return new ClusterOverview(resourceOverview, allJobsOverview); }); } @Override public CompletableFuture
requestMultipleJobDetails(Time timeout) { List
>> individualOptionalJobDetails = queryJobMastersForInformation( (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobDetails(timeout)); CompletableFuture
>> optionalCombinedJobDetails = FutureUtils.combineAll( individualOptionalJobDetails); CompletableFuture
> combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection); final Collection
completedJobDetails = archivedExecutionGraphStore.getAvailableJobDetails(); return combinedJobDetails.thenApply( (Collection
runningJobDetails) -> { final Collection
allJobDetails = new ArrayList<>(completedJobDetails.size() + runningJobDetails.size()); allJobDetails.addAll(runningJobDetails); allJobDetails.addAll(completedJobDetails); return new MultipleJobsDetails(allJobDetails); }); } @Override public CompletableFuture
requestJobStatus(JobID jobId, Time timeout) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); final CompletableFuture
jobStatusFuture = jobMasterGatewayFuture.thenCompose( (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout)); return jobStatusFuture.exceptionally( (Throwable throwable) -> { final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); // check whether it is a completed job if (jobDetails == null) { throw new CompletionException(ExceptionUtils.stripCompletionException(throwable)); } else { return jobDetails.getStatus(); } }); } @Override public CompletableFuture
requestOperatorBackPressureStats( final JobID jobId, final JobVertexID jobVertexId) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestOperatorBackPressureStats(jobVertexId)); } @Override public CompletableFuture
requestJob(JobID jobId, Time timeout) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); final CompletableFuture
archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose( (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout)); return archivedExecutionGraphFuture.exceptionally( (Throwable throwable) -> { final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId); // check whether it is a completed job if (serializableExecutionGraph == null) { throw new CompletionException(ExceptionUtils.stripCompletionException(throwable)); } else { return serializableExecutionGraph; } }); } @Override public CompletableFuture
requestJobResult(JobID jobId, Time timeout) { final CompletableFuture
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); if (jobManagerRunnerFuture == null) { final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId); if (archivedExecutionGraph == null) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } else { return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph)); } } else { return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom); } } @Override public CompletableFuture
> requestMetricQueryServicePaths(Time timeout) { if (metricQueryServicePath != null) { return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath)); } else { return CompletableFuture.completedFuture(Collections.emptyList()); } } @Override public CompletableFuture
>> requestTaskManagerMetricQueryServicePaths(Time timeout) { return runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout)); } @Override public CompletableFuture
getBlobServerPort(Time timeout) { return CompletableFuture.completedFuture(blobServer.getPort()); } @Override public CompletableFuture
triggerSavepoint( final JobID jobId, final String targetDirectory, final boolean cancelJob, final Time timeout) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); return jobMasterGatewayFuture.thenCompose( (JobMasterGateway jobMasterGateway) -> jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout)); } @Override public CompletableFuture
shutDownCluster() { closeAsync(); return CompletableFuture.completedFuture(Acknowledge.get()); } /** * Cleans up the job related data from the dispatcher. If cleanupHA is true, then * the data will also be removed from HA. * * @param jobId JobID identifying the job to clean up * @param cleanupHA True iff HA data shall also be cleaned up */ private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) { final CompletableFuture
cleanupFuture = removeJob(jobId, cleanupHA); registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture); } private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture
jobManagerRunnerTerminationFuture) { Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId)); jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture); // clean up the pending termination future jobManagerRunnerTerminationFuture.thenRunAsync( () -> { final CompletableFuture
terminationFuture = jobManagerTerminationFutures.remove(jobId); //noinspection ObjectEquality if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) { jobManagerTerminationFutures.put(jobId, terminationFuture); } }, getUnfencedMainThreadExecutor()); } private CompletableFuture
removeJob(JobID jobId, boolean cleanupHA) { CompletableFuture
jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId); final CompletableFuture
jobManagerRunnerTerminationFuture; if (jobManagerRunnerFuture != null) { jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync); } else { jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null); } return jobManagerRunnerTerminationFuture.thenRunAsync( () -> cleanUpJobData(jobId, cleanupHA), getRpcService().getExecutor()); } private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobManagerMetricGroup.removeJob(jobId); boolean cleanupHABlobs = false; if (cleanupHA) { try { submittedJobGraphStore.removeJobGraph(jobId); // only clean up the HA blobs if we could remove the job from HA storage cleanupHABlobs = true; } catch (Exception e) { log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e); } try { runningJobsRegistry.clearJob(jobId); } catch (IOException e) { log.warn("Could not properly remove job {} from the running jobs registry.", jobId, e); } } else { try { submittedJobGraphStore.releaseJobGraph(jobId); } catch (Exception e) { log.warn("Could not properly release job {} from submitted job graph store.", jobId, e); } } blobServer.cleanupJob(jobId, cleanupHABlobs); } /** * Terminate all currently running {@link JobManagerRunner}. */ private void terminateJobManagerRunners() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); final HashSet
jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet()); for (JobID jobId : jobsToRemove) { removeJobAndRegisterTerminationFuture(jobId, false); } } private CompletableFuture
terminateJobManagerRunnersAndGetTerminationFuture() { terminateJobManagerRunners(); final Collection
> values = jobManagerTerminationFutures.values(); return FutureUtils.completeAll(values); } /** * Recovers all jobs persisted via the submitted job graph store. */ @VisibleForTesting Collection
recoverJobs() throws Exception { log.info("Recovering all persisted jobs."); final Collection
jobIds = submittedJobGraphStore.getJobIds(); try { return recoverJobGraphs(jobIds); } catch (Exception e) { // release all recovered job graphs for (JobID jobId : jobIds) { try { submittedJobGraphStore.releaseJobGraph(jobId); } catch (Exception ie) { e.addSuppressed(ie); } } throw e; } } @Nonnull private Collection
recoverJobGraphs(Collection
jobIds) throws Exception { final List
jobGraphs = new ArrayList<>(jobIds.size()); for (JobID jobId : jobIds) { final JobGraph jobGraph = recoverJob(jobId); if (jobGraph == null) { throw new FlinkJobNotFoundException(jobId); } jobGraphs.add(jobGraph); } return jobGraphs; } @Nullable private JobGraph recoverJob(JobID jobId) throws Exception { log.debug("Recover job {}.", jobId); final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); if (submittedJobGraph != null) { return submittedJobGraph.getJobGraph(); } else { return null; } } protected void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { Preconditions.checkArgument( archivedExecutionGraph.getState().isGloballyTerminalState(), "Job %s is in state %s which is not globally terminal.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()); log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()); archiveExecutionGraph(archivedExecutionGraph); final JobID jobId = archivedExecutionGraph.getJobID(); removeJobAndRegisterTerminationFuture(jobId, true); } private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) { try { archivedExecutionGraphStore.put(archivedExecutionGraph); } catch (IOException e) { log.info( "Could not store completed job {}({}).", archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), e); } final CompletableFuture
executionGraphFuture = historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph); executionGraphFuture.whenComplete( (Acknowledge ignored, Throwable throwable) -> { if (throwable != null) { log.info( "Could not archive completed job {}({}) to the history server.", archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), throwable); } }); } protected void jobNotFinished(JobID jobId) { log.info("Job {} was not finished by JobManager.", jobId); removeJobAndRegisterTerminationFuture(jobId, false); } private void jobMasterFailed(JobID jobId, Throwable cause) { // we fail fatally in case of a JobMaster failure in order to restart the // dispatcher to recover the jobs again. This only works in HA mode, though onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause)); } private CompletableFuture
getJobMasterGatewayFuture(JobID jobId) { final CompletableFuture
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); if (jobManagerRunnerFuture == null) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } else { final CompletableFuture
leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture); return leaderGatewayFuture.thenApplyAsync( (JobMasterGateway jobMasterGateway) -> { // check whether the retrieved JobMasterGateway belongs still to a running JobMaster if (jobManagerRunnerFutures.containsKey(jobId)) { return jobMasterGateway; } else { throw new CompletionException(new FlinkJobNotFoundException(jobId)); } }, getMainThreadExecutor()); } } private CompletableFuture
getResourceManagerGateway() { return resourceManagerGatewayRetriever.getFuture(); } private
CompletableFuture
runResourceManagerCommand(Function
> resourceManagerCommand) { return getResourceManagerGateway().thenApply(resourceManagerCommand).thenCompose(Function.identity()); } private
List
flattenOptionalCollection(Collection
> optionalCollection) { return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()); } @Nonnull private
List
>> queryJobMastersForInformation(Function
> queryFunction) { final int numberJobsRunning = jobManagerRunnerFutures.size(); ArrayList
>> optionalJobInformation = new ArrayList<>( numberJobsRunning); for (JobID jobId : jobManagerRunnerFutures.keySet()) { final CompletableFuture
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); final CompletableFuture
> optionalRequest = jobMasterGatewayFuture .thenCompose(queryFunction::apply) .handle((T value, Throwable throwable) -> Optional.ofNullable(value)); optionalJobInformation.add(optionalRequest); } return optionalJobInformation; } //------------------------------------------------------ // Leader contender //------------------------------------------------------ /** * Callback method when current resourceManager is granted leadership. * * @param newLeaderSessionID unique leadershipID */ @Override public void grantLeadership(final UUID newLeaderSessionID) { runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); final CompletableFuture
> recoveredJobsFuture = recoveryOperation.thenApplyAsync( FunctionUtils.uncheckedFunction(ignored -> recoverJobs()), getRpcService().getExecutor()); final CompletableFuture
fencingTokenFuture = recoveredJobsFuture.thenComposeAsync( (Collection
recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs), getUnfencedMainThreadExecutor()); final CompletableFuture
confirmationFuture = fencingTokenFuture.thenCombineAsync( recoveredJobsFuture, BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection
recoveredJobs) -> { if (confirmLeadership) { leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); } else { for (JobGraph recoveredJob : recoveredJobs) { submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID()); } } return null; }), getRpcService().getExecutor()); confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError( new DispatcherException( String.format("Failed to take leadership with session id %s.", newLeaderSessionID), (ExceptionUtils.stripCompletionException(throwable)))); } }); recoveryOperation = confirmationFuture; }); } private CompletableFuture
tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection
recoveredJobs) { final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID); if (leaderElectionService.hasLeadership(newLeaderSessionID)) { log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId); setNewFencingToken(dispatcherId); Collection
> runFutures = new ArrayList<>(recoveredJobs.size()); for (JobGraph recoveredJob : recoveredJobs) { final CompletableFuture
runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob); runFutures.add(runFuture); } return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true); } else { log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId); return CompletableFuture.completedFuture(false); } } private CompletableFuture
waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException
, ?> action) { final CompletableFuture
jobManagerTerminationFuture = getJobTerminationFuture(jobId) .exceptionally((Throwable throwable) -> { throw new CompletionException( new DispatcherException( String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId), throwable)); }); return jobManagerTerminationFuture.thenComposeAsync( FunctionUtils.uncheckedFunction((ignored) -> { jobManagerTerminationFutures.remove(jobId); return action.apply(jobGraph); }), getMainThreadExecutor()); } CompletableFuture
getJobTerminationFuture(JobID jobId) { if (jobManagerRunnerFutures.containsKey(jobId)) { return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId))); } else { return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null)); } } @VisibleForTesting CompletableFuture
getRecoveryOperation() { return recoveryOperation; } private void setNewFencingToken(@Nullable DispatcherId dispatcherId) { // clear the state if we've been the leader before if (getFencingToken() != null) { clearDispatcherState(); } setFencingToken(dispatcherId); } private void clearDispatcherState() { terminateJobManagerRunners(); } private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) { jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerFutures.size()); } /** * Callback method when current resourceManager loses leadership. */ @Override public void revokeLeadership() { runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); setNewFencingToken(null); }); } /** * Handles error occurring in the leader election service. * * @param exception Exception being thrown in the leader election service */ @Override public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } //------------------------------------------------------ // SubmittedJobGraphListener //------------------------------------------------------ @Override public void onAddedJobGraph(final JobID jobId) { runAsync( () -> { if (!jobManagerRunnerFutures.containsKey(jobId)) { // IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that // the specified job is already removed from the SubmittedJobGraphStore. In this case, // SubmittedJobGraphStore.recoverJob returns null. final CompletableFuture
> recoveredJob = recoveryOperation.thenApplyAsync( FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(recoverJob(jobId))), getRpcService().getExecutor()); final DispatcherId dispatcherId = getFencingToken(); final CompletableFuture
submissionFuture = recoveredJob.thenComposeAsync( (Optional
jobGraphOptional) -> jobGraphOptional.map( FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync( FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> { if (!isRecoveredJobRunning) { submittedJobGraphStore.releaseJobGraph(jobId); } }), getRpcService().getExecutor()))) .orElse(CompletableFuture.completedFuture(null)), getUnfencedMainThreadExecutor()); submissionFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError( new DispatcherException( String.format("Could not start the added job %s", jobId), ExceptionUtils.stripCompletionException(throwable))); } }); recoveryOperation = submissionFuture; } }); } private CompletableFuture
tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception { if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) { final JobID jobId = jobGraph.getJobID(); if (jobManagerRunnerFutures.containsKey(jobId)) { // we must not release the job graph lock since it can only be locked once and // is currently being executed. Once we support multiple locks, we must release // the JobGraph here log.debug("Ignore added JobGraph because the job {} is already running.", jobId); return CompletableFuture.completedFuture(true); } else if (runningJobsRegistry.getJobSchedulingStatus(jobId) != RunningJobsRegistry.JobSchedulingStatus.DONE) { return waitForTerminatingJobManager(jobId, jobGraph, this::runJob).thenApply(ignored -> true); } else { log.debug("Ignore added JobGraph because the job {} has already been completed.", jobId); } } return CompletableFuture.completedFuture(false); } @Override public void onRemovedJobGraph(final JobID jobId) { runAsync(() -> { try { removeJobAndRegisterTerminationFuture(jobId, false); } catch (final Exception e) { onFatalError(new DispatcherException(String.format("Could not remove job %s.", jobId), e)); } }); }}

可以看到其中有图的Add,Remove等,这就是图进行分发的各种动作。在以后会进一步分析其内部的调用流程。

四、总结

通过上述的分析,基本明白了flink中对一个工作图的分发和得到的过程,具体的生成过程,会在StreamGraph转到JobGraph过程中进行分析,这里不详细分析展开。

转载地址:http://kxxq.baihongyu.com/

你可能感兴趣的文章
想彻底了解maven,有这篇文章足够了(中)
查看>>
Intellij IDEA 一些让人爱不释手的小技巧
查看>>
idea连接服务器远程调试(Dockerfile版)
查看>>
ElasicJob分布式定时任务
查看>>
feign调用上传文件接口(MultipartFile)
查看>>
centos 文件格式不对执行报错 || centos查看或者修改文件格式
查看>>
win锁屏界面用户名修改
查看>>
Java设计模式 —— 桥接模式(Bridge)
查看>>
计算机三级 信息安全技术历年真题(二)总共十套 3月底之前更完
查看>>
详解: 最小生成树
查看>>
[编程题]:n头牛中选择满足所有m种特性的牛(百度2021)
查看>>
Redis中的删除策略和逐出算法
查看>>
[数据结构]:红黑树(二)
查看>>
PyCharm安装与配置
查看>>
Docker - 部署 Redis 6.0.8
查看>>
Android - Create dynamic lists with RecyclerView
查看>>
Android - Broadcasts overview(不完整)
查看>>
Spring Boot - Add a Servlet to an Application
查看>>
OrCAD Capture CIS 16.6 - 为元器件添加属性
查看>>
Web API - File - Selecting files using drag and drop
查看>>