博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink taskmanager的jvm-exit-on-oom配置
阅读量:5838 次
发布时间:2019-06-18

本文共 18332 字,大约阅读时间需要 61 分钟。

  hot3.png

本文主要研究一下flink taskmanager的jvm-exit-on-oom配置

taskmanager.jvm-exit-on-oom

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolvingpublic class TaskManagerOptions {	//......	/**	 * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.	 */	public static final ConfigOption
KILL_ON_OUT_OF_MEMORY = key("taskmanager.jvm-exit-on-oom") .defaultValue(false) .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError."); //......}
  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager

TaskManagerConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);	private final int numberSlots;	private final String[] tmpDirectories;	private final Time timeout;	// null indicates an infinite duration	@Nullable	private final Time maxRegistrationDuration;	private final Time initialRegistrationPause;	private final Time maxRegistrationPause;	private final Time refusedRegistrationPause;	private final UnmodifiableConfiguration configuration;	private final boolean exitJvmOnOutOfMemory;	private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;	private final String[] alwaysParentFirstLoaderPatterns;	@Nullable	private final String taskManagerLogPath;	@Nullable	private final String taskManagerStdoutPath;	public TaskManagerConfiguration(		int numberSlots,		String[] tmpDirectories,		Time timeout,		@Nullable Time maxRegistrationDuration,		Time initialRegistrationPause,		Time maxRegistrationPause,		Time refusedRegistrationPause,		Configuration configuration,		boolean exitJvmOnOutOfMemory,		FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,		String[] alwaysParentFirstLoaderPatterns,		@Nullable String taskManagerLogPath,		@Nullable String taskManagerStdoutPath) {		this.numberSlots = numberSlots;		this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);		this.timeout = Preconditions.checkNotNull(timeout);		this.maxRegistrationDuration = maxRegistrationDuration;		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;		this.classLoaderResolveOrder = classLoaderResolveOrder;		this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;		this.taskManagerLogPath = taskManagerLogPath;		this.taskManagerStdoutPath = taskManagerStdoutPath;	}	public int getNumberSlots() {		return numberSlots;	}	public Time getTimeout() {		return timeout;	}	@Nullable	public Time getMaxRegistrationDuration() {		return maxRegistrationDuration;	}	public Time getInitialRegistrationPause() {		return initialRegistrationPause;	}	@Nullable	public Time getMaxRegistrationPause() {		return maxRegistrationPause;	}	public Time getRefusedRegistrationPause() {		return refusedRegistrationPause;	}	@Override	public Configuration getConfiguration() {		return configuration;	}	@Override	public String[] getTmpDirectories() {		return tmpDirectories;	}	@Override	public boolean shouldExitJvmOnOutOfMemoryError() {		return exitJvmOnOutOfMemory;	}	public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {		return classLoaderResolveOrder;	}	public String[] getAlwaysParentFirstLoaderPatterns() {		return alwaysParentFirstLoaderPatterns;	}	@Nullable	public String getTaskManagerLogPath() {		return taskManagerLogPath;	}	@Nullable	public String getTaskManagerStdoutPath() {		return taskManagerStdoutPath;	}	// --------------------------------------------------------------------------------------------	//  Static factory methods	// --------------------------------------------------------------------------------------------	public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {		int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);		if (numberSlots == -1) {			numberSlots = 1;		}		final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);		final Time timeout;		try {			timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());		} catch (Exception e) {			throw new IllegalArgumentException(				"Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +					"'.Use formats like '50 s' or '1 min' to specify the timeout.");		}		LOG.info("Messages have a max timeout of " + timeout);		final Time finiteRegistrationDuration;		try {			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));			if (maxRegistrationDuration.isFinite()) {				finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());			} else {				finiteRegistrationDuration = null;			}		} catch (NumberFormatException e) {			throw new IllegalArgumentException("Invalid format for parameter " +				TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);		}		final Time initialRegistrationPause;		try {			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));			if (pause.isFinite()) {				initialRegistrationPause = Time.milliseconds(pause.toMillis());			} else {				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);			}		} catch (NumberFormatException e) {			throw new IllegalArgumentException("Invalid format for parameter " +				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);		}		final Time maxRegistrationPause;		try {			Duration pause = Duration.create(configuration.getString(				TaskManagerOptions.REGISTRATION_MAX_BACKOFF));			if (pause.isFinite()) {				maxRegistrationPause = Time.milliseconds(pause.toMillis());			} else {				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);			}		} catch (NumberFormatException e) {			throw new IllegalArgumentException("Invalid format for parameter " +				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);		}		final Time refusedRegistrationPause;		try {			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));			if (pause.isFinite()) {				refusedRegistrationPause = Time.milliseconds(pause.toMillis());			} else {				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);			}		} catch (NumberFormatException e) {			throw new IllegalArgumentException("Invalid format for parameter " +				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);		}		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);		final String classLoaderResolveOrder =			configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);		final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);		final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));		final String taskManagerStdoutPath;		if (taskManagerLogPath != null) {			final int extension = taskManagerLogPath.lastIndexOf('.');			if (extension > 0) {				taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";			} else {				taskManagerStdoutPath = null;			}		} else {			taskManagerStdoutPath = null;		}		return new TaskManagerConfiguration(			numberSlots,			tmpDirPaths,			timeout,			finiteRegistrationDuration,			initialRegistrationPause,			maxRegistrationPause,			refusedRegistrationPause,			configuration,			exitOnOom,			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),			alwaysParentFirstLoaderPatterns,			taskManagerLogPath,			taskManagerStdoutPath);	}}
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性

Task

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {	//......	@Override	public void run() {		// ----------------------------		//  Initial State transition		// ----------------------------		//......		// all resource acquisitions and registrations from here on		// need to be undone in the end		Map
> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { //...... // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { // unwrap wrapped exceptions to make stack traces more compact if (t instanceof WrappingRuntimeException) { t = ((WrappingRuntimeException) t).unwrap(); } // ---------------------------------------------------------------- // the execution failed. either the invokable code properly failed, or // an exception was thrown as a side effect of cancelling // ---------------------------------------------------------------- try { // check if the exception is unrecoverable if (ExceptionUtils.isJvmFatalError(t) || (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) { // terminate the JVM immediately // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete try { LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t); } finally { Runtime.getRuntime().halt(-1); } } // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED // loop for multiple retries during concurrent state changes via calls to cancel() or // to failExternally() while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(invokable); break; } } else { if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause failureCause = t; cancelInvokable(invokable); break; } } } else if (current == ExecutionState.CANCELING) { if (transitionState(current, ExecutionState.CANCELED)) { break; } } else if (current == ExecutionState.FAILED) { // in state failed already, no transition necessary any more break; } // unexpected state, go to failed else if (transitionState(current, ExecutionState.FAILED, t)) { LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { //...... } } //......}
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM

ExceptionUtils.isJvmFatalError

flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java

@Internalpublic final class ExceptionUtils {	//......	/**	 * Checks whether the given exception indicates a situation that may leave the	 * JVM in a corrupted state, meaning a state where continued normal operation can only be	 * guaranteed via clean process restart.	 *	 * 

Currently considered fatal exceptions are Virtual Machine errors indicating * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError}, * and {@link java.util.zip.ZipError} (a special case of InternalError). * The {@link ThreadDeath} exception is also treated as a fatal error, because when * a thread is forcefully stopped, there is a high chance that parts of the system * are in an inconsistent state. * * @param t The exception to check. * @return True, if the exception is considered fatal to the JVM, false otherwise. */ public static boolean isJvmFatalError(Throwable t) { return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath); } //......}

  • isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true

Runtime.getRuntime().halt

java.base/java/lang/Runtime.java

public class Runtime {	//......	private static final Runtime currentRuntime = new Runtime();    /**     * Returns the runtime object associated with the current Java application.     * Most of the methods of class {@code Runtime} are instance     * methods and must be invoked with respect to the current runtime object.     *     * @return  the {@code Runtime} object associated with the current     *          Java application.     */    public static Runtime getRuntime() {        return currentRuntime;    }    /**     * Forcibly terminates the currently running Java virtual machine.  This     * method never returns normally.     *     * 

This method should be used with extreme caution. Unlike the * {@link #exit exit} method, this method does not cause shutdown * hooks to be started. If the shutdown sequence has already been * initiated then this method does not wait for any running * shutdown hooks to finish their work. * * @param status * Termination status. By convention, a nonzero status code * indicates abnormal termination. If the {@link Runtime#exit exit} * (equivalently, {@link System#exit(int) System.exit}) method * has already been invoked then this status code * will override the status code passed to that method. * * @throws SecurityException * If a security manager is present and its * {@link SecurityManager#checkExit checkExit} method * does not permit an exit with the specified status * * @see #exit * @see #addShutdownHook * @see #removeShutdownHook * @since 1.3 */ public void halt(int status) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkExit(status); } Shutdown.beforeHalt(); Shutdown.halt(status); } //......}

  • halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

小结

  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

doc

转载于:https://my.oschina.net/go4it/blog/3013884

你可能感兴趣的文章
nodejs字符与字节之间的转换
查看>>
靠谱的 windows下MySql密码重置 方法
查看>>
PHPUnit学习03---使用Mock对象解决测试依赖
查看>>
android96 内存创建图片副本,画画板
查看>>
C# WinForm启动时的事件加载次序
查看>>
ktime使用例子【原创】
查看>>
A small trick to avoid 404 error when redmine runs on dreamhost
查看>>
python 回溯法 子集树模板 系列 —— 16、爬楼梯
查看>>
利用Xcode建立PhoneGap应用程序环境
查看>>
Node.js 网页爬虫再进阶,cheerio助力
查看>>
linux的mtd架构分析【转】
查看>>
putty远程登录Linux服务器非常慢
查看>>
iOS开发-迭代器模式
查看>>
全套 ArcGIS 软件安装(Windows 平台)
查看>>
java类型与Hadoop类型之间的转换
查看>>
允许SQL Server 2005远程连接
查看>>
微软为asp.net ajax和jquery创建了CDN
查看>>
Chris:怎样成为一名Android应用开发
查看>>
常见的makefile写法【转】
查看>>
emmet,jade,haml, slim,less,sass,coffeescript等的实战优缺点
查看>>