main content

wait for reinforcement learning environment simulations running on a workers to finish -凯发k8网页登录

wait for reinforcement learning environment simulations running on a workers to finish

since r2022a

syntax

description

example

wait(f) blocks the command prompt and waits for all elements of f (each corresponding to a simulation scheduled on a worker) to reach a finished state.

examples

this example shows how to use future objects and their methods fetchnext, fetchoutput, cancel, and wait to defer output retrieval for environment simulations running on workers, monitor the status of ongoing simulations, fetch outputs of completed simulations, cancel ongoing simulations, or wait for ongoing simulations to complete.

load a predefined environment and a suitable agent. for this example use both the environment and agent described in train ac agent to balance cart-pole system.

env = rlpredefinedenv("cartpole-discrete");
load("matlabcartpoleac.mat","agent")

start a parallel pool and set up the environment so that it simulates on workers.

pp = parpool(2);
starting parallel pool (parpool) using the 'processes' profile ...
connected to parallel pool with 2 workers.
setup(env,useparallel=true);

to display the simulation completion times, start a timer.

tic

schedule six simulation to run on the available workers. at the beginning of the simulation, the reset function of the cart-pole environment sets the initial angle of the pole to a random position in the neighborhood of zero (the upward position). this randomization ensures that each simulation is different.

for i=1:6  
    ftr(i) = runepisode(env,agent,cleanuppostsim=false); 
end

each element of the future array ftr represents a scheduled simulation.

ftr
ftr=1×6 object
  1×6 future array with properties:
    read
    state
    diary
    id

display the state of each simulation.

ftr.state
ans = 
'running'
ans = 
'running'
ans = 
'queued'
ans = 
'queued'
ans = 
'queued'
ans = 
'queued'

two simulations are ongoing while the others are queued.

use fetchnext with a timeout of 0.1 seconds to retrieve results for simulations that complete within that time (if any).

[idx,out] = fetchnext(ftr,0.1)
idx =
     []
out =
     []

both the outputs are empty, which means that none of the four simulation has completed yet.

display how many output results have been already retrieved.

ftr.read
ans = logical
   0
ans = logical
   0
ans = logical
   0
ans = logical
   0
ans = logical
   0
ans = logical
   0

use fetchnext without any timeout to wait until an unretrieved simulation output becomes available and then return the results.

[idx,out] = fetchnext(ftr)
idx = 2
out = struct with fields:
    simulationinfo: [1×1 struct]
         agentdata: [1×1 struct]

display the state of the simulations.

ftr.state
ans = 
'finished'
ans = 
'finished'
ans = 
'running'
ans = 
'running'
ans = 
'queued'
ans = 
'queued'

as expected, the first two simulations, which were running in parallel on the two workers, are finished, while the next two, which were previously queued, are now running, and the final two are still queued.

display the time taken for the first two simulations to complete.

toc
elapsed time is 10.451231 seconds.

note that once the results from a simulation has been already retrieved, any attempt to use fetchnext to retrieve it again, such as in fetchnext(ftr(2)), will result in an error. to retrieve the results from a future object that has already been read, you can use fetchouptuts, such as in fetchoutputs(ftr(2)).

retrieve the next available result, and display the time elapsed since the simulations started.

[idx,out] = fetchnext(ftr)
idx = 1
out = struct with fields:
    simulationinfo: [1×1 struct]
         agentdata: [1×1 struct]
toc
elapsed time is 11.945070 seconds.

as expected, fetchnext promptly returns the results from the second simulation, since it was already available.

display how many output results have been already retrieved.

ftr.read
ans = logical
   1
ans = logical
   1
ans = logical
   0
ans = logical
   0
ans = logical
   0
ans = logical
   0

cancel the last simulation.

cancel(ftr(6))

wait for the fourth simulation to complete. the wait function blocks the command prompt until the fourth simulation is completed.

wait(ftr(4))

display the elapsed time since the simulations started.

toc
elapsed time is 12.414076 seconds.

display the state of the simulations.

ftr.state
ans = 
'finished'
ans = 
'finished'
ans = 
'finished'
ans = 
'finished'
ans = 
'running'
ans = 
'finished'

the status of the last element of the array, for which the simulation has been canceled, is classified as 'finished'.

since any attempt to retrieve results from a simulation that has been canceled will result in an error, remove the canceled object from the array.

ftr(6)=[]
ftr=1×5 object
  1×5 future array with properties:
    read
    state
    diary
    id

use fetchoutputs to wait until all remaining simulations are completed and then retrieve all outputs.

outs = fetchoutputs(ftr)
outs=5×1 struct array with fields:
    simulationinfo
    agentdata

display the elapsed time.

toc
elapsed time is 16.265069 seconds.

plot the action and observations from the fifth simulation.

figure
subplot(2,1,1);
plot(outs(5).agentdata.time(2:end), ...
     cell2mat([outs(5).agentdata.experiences.action]))
title('simulation #5: action');
xlabel('time');
subplot(2,1,2)
plot(outs(5).agentdata.time(2:end), ...
     cell2mat([outs(5).agentdata.experiences.observation]))
title('simulation #5: observations')
xlabel('time');

clear the array of future objects, the environment, and delete the parallel pool (this is the reverse order in which they were created).

clear ftr
clear env
delete(pp)

input arguments

future simulation outputs, specified as a future object or as an array of future objects. to create an element of f, set the useparallel property of a reinforcement learning environment to true, and then use runepisode to simulate an agent or a policy within this environment. assign the element of f to the output of runepisode.

version history

introduced in r2022a

网站地图