序
本文主要研究一下flink taskmanager的jvm-exit-on-oom配置
taskmanager.jvm-exit-on-oom
flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolvingpublic class TaskManagerOptions { //...... /** * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError. */ public static final ConfigOptionKILL_ON_OUT_OF_MEMORY = key("taskmanager.jvm-exit-on-oom") .defaultValue(false) .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError."); //......}
- taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
TaskManagerConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class); private final int numberSlots; private final String[] tmpDirectories; private final Time timeout; // null indicates an infinite duration @Nullable private final Time maxRegistrationDuration; private final Time initialRegistrationPause; private final Time maxRegistrationPause; private final Time refusedRegistrationPause; private final UnmodifiableConfiguration configuration; private final boolean exitJvmOnOutOfMemory; private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; private final String[] alwaysParentFirstLoaderPatterns; @Nullable private final String taskManagerLogPath; @Nullable private final String taskManagerStdoutPath; public TaskManagerConfiguration( int numberSlots, String[] tmpDirectories, Time timeout, @Nullable Time maxRegistrationDuration, Time initialRegistrationPause, Time maxRegistrationPause, Time refusedRegistrationPause, Configuration configuration, boolean exitJvmOnOutOfMemory, FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder, String[] alwaysParentFirstLoaderPatterns, @Nullable String taskManagerLogPath, @Nullable String taskManagerStdoutPath) { this.numberSlots = numberSlots; this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories); this.timeout = Preconditions.checkNotNull(timeout); this.maxRegistrationDuration = maxRegistrationDuration; this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause); this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause); this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause); this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory; this.classLoaderResolveOrder = classLoaderResolveOrder; this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns; this.taskManagerLogPath = taskManagerLogPath; this.taskManagerStdoutPath = taskManagerStdoutPath; } public int getNumberSlots() { return numberSlots; } public Time getTimeout() { return timeout; } @Nullable public Time getMaxRegistrationDuration() { return maxRegistrationDuration; } public Time getInitialRegistrationPause() { return initialRegistrationPause; } @Nullable public Time getMaxRegistrationPause() { return maxRegistrationPause; } public Time getRefusedRegistrationPause() { return refusedRegistrationPause; } @Override public Configuration getConfiguration() { return configuration; } @Override public String[] getTmpDirectories() { return tmpDirectories; } @Override public boolean shouldExitJvmOnOutOfMemoryError() { return exitJvmOnOutOfMemory; } public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() { return classLoaderResolveOrder; } public String[] getAlwaysParentFirstLoaderPatterns() { return alwaysParentFirstLoaderPatterns; } @Nullable public String getTaskManagerLogPath() { return taskManagerLogPath; } @Nullable public String getTaskManagerStdoutPath() { return taskManagerStdoutPath; } // -------------------------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------------------------- public static TaskManagerConfiguration fromConfiguration(Configuration configuration) { int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (numberSlots == -1) { numberSlots = 1; } final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration); final Time timeout; try { timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); } catch (Exception e) { throw new IllegalArgumentException( "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() + "'.Use formats like '50 s' or '1 min' to specify the timeout."); } LOG.info("Messages have a max timeout of " + timeout); final Time finiteRegistrationDuration; try { Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT)); if (maxRegistrationDuration.isFinite()) { finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis()); } else { finiteRegistrationDuration = null; } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e); } final Time initialRegistrationPause; try { Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF)); if (pause.isFinite()) { initialRegistrationPause = Time.milliseconds(pause.toMillis()); } else { throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e); } final Time maxRegistrationPause; try { Duration pause = Duration.create(configuration.getString( TaskManagerOptions.REGISTRATION_MAX_BACKOFF)); if (pause.isFinite()) { maxRegistrationPause = Time.milliseconds(pause.toMillis()); } else { throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e); } final Time refusedRegistrationPause; try { Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF)); if (pause.isFinite()) { refusedRegistrationPause = Time.milliseconds(pause.toMillis()); } else { throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e); } final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY); final String classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration); final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")); final String taskManagerStdoutPath; if (taskManagerLogPath != null) { final int extension = taskManagerLogPath.lastIndexOf('.'); if (extension > 0) { taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out"; } else { taskManagerStdoutPath = null; } } else { taskManagerStdoutPath = null; } return new TaskManagerConfiguration( numberSlots, tmpDirPaths, timeout, finiteRegistrationDuration, initialRegistrationPause, maxRegistrationPause, refusedRegistrationPause, configuration, exitOnOom, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), alwaysParentFirstLoaderPatterns, taskManagerLogPath, taskManagerStdoutPath); }}
- TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
Task
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener { //...... @Override public void run() { // ---------------------------- // Initial State transition // ---------------------------- //...... // all resource acquisitions and registrations from here on // need to be undone in the end Map> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { //...... // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { // unwrap wrapped exceptions to make stack traces more compact if (t instanceof WrappingRuntimeException) { t = ((WrappingRuntimeException) t).unwrap(); } // ---------------------------------------------------------------- // the execution failed. either the invokable code properly failed, or // an exception was thrown as a side effect of cancelling // ---------------------------------------------------------------- try { // check if the exception is unrecoverable if (ExceptionUtils.isJvmFatalError(t) || (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) { // terminate the JVM immediately // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete try { LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t); } finally { Runtime.getRuntime().halt(-1); } } // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED // loop for multiple retries during concurrent state changes via calls to cancel() or // to failExternally() while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(invokable); break; } } else { if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause failureCause = t; cancelInvokable(invokable); break; } } } else if (current == ExecutionState.CANCELING) { if (transitionState(current, ExecutionState.CANCELED)) { break; } } else if (current == ExecutionState.FAILED) { // in state failed already, no transition necessary any more break; } // unexpected state, go to failed else if (transitionState(current, ExecutionState.FAILED, t)) { LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { //...... } } //......}
- Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM
ExceptionUtils.isJvmFatalError
flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@Internalpublic final class ExceptionUtils { //...... /** * Checks whether the given exception indicates a situation that may leave the * JVM in a corrupted state, meaning a state where continued normal operation can only be * guaranteed via clean process restart. * *Currently considered fatal exceptions are Virtual Machine errors indicating * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError}, * and {@link java.util.zip.ZipError} (a special case of InternalError). * The {@link ThreadDeath} exception is also treated as a fatal error, because when * a thread is forcefully stopped, there is a high chance that parts of the system * are in an inconsistent state. * * @param t The exception to check. * @return True, if the exception is considered fatal to the JVM, false otherwise. */ public static boolean isJvmFatalError(Throwable t) { return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath); } //......}
- isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true
Runtime.getRuntime().halt
java.base/java/lang/Runtime.java
public class Runtime { //...... private static final Runtime currentRuntime = new Runtime(); /** * Returns the runtime object associated with the current Java application. * Most of the methods of class {@code Runtime} are instance * methods and must be invoked with respect to the current runtime object. * * @return the {@code Runtime} object associated with the current * Java application. */ public static Runtime getRuntime() { return currentRuntime; } /** * Forcibly terminates the currently running Java virtual machine. This * method never returns normally. * *This method should be used with extreme caution. Unlike the * {@link #exit exit} method, this method does not cause shutdown * hooks to be started. If the shutdown sequence has already been * initiated then this method does not wait for any running * shutdown hooks to finish their work. * * @param status * Termination status. By convention, a nonzero status code * indicates abnormal termination. If the {@link Runtime#exit exit} * (equivalently, {@link System#exit(int) System.exit}) method * has already been invoked then this status code * will override the status code passed to that method. * * @throws SecurityException * If a security manager is present and its * {@link SecurityManager#checkExit checkExit} method * does not permit an exit with the specified status * * @see #exit * @see #addShutdownHook * @see #removeShutdownHook * @since 1.3 */ public void halt(int status) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkExit(status); } Shutdown.beforeHalt(); Shutdown.halt(status); } //......}
- halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM
小结
- taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
- TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
- Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM