MULE FTP : Create Directory If Not Exist

Posted on the 15 September 2014 by Abhishek Somani @somaniabhi
FTP and SFTP endpoints of mule , does not create directory on the fly , if the directory does not exist at ftp location . It throws error like this :

ERROR 2007-11-05 15:24:29,192 [connector.ftp.0.receiver.1] org.mule.impl.DefaultExceptionStrategy: Caught exception in Exception Strategy: Failed to change working directory to /IN. Ftp error: 550
java.io.IOException: Failed to change working directory to /IN. Ftp error: 550
at org.mule.providers.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:103)
at org.mule.providers.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:72)
at org.mule.providers.PollingReceiverWorker.run(PollingReceiverWorker.java:47)
at org.mule.impl.work.WorkerContext.run(WorkerContext.java:310)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:987)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:528)
at java.lang.Thread.run(Thread.java:619)
There is an Open Issue for this functionality . To implement this functionality for FTP , we have to create a custom FTPDispatcher and integrate it in the mule ftp connector . So first change mule ftp connector to have CustomFTPMessageDispatcher like this:

<ftp:connector name="ftp_con">
<service-overrides dispatcherFactory="CustomFtpMessageDispatcherFactory"></service-overrides>
</ftp:connector>
<flow name="sftpExampleFlow1" doc:name="sftpExampleFlow1">

<file:inbound-endpoint path="E:/ftp" responseTimeout="10000" doc:name="File"/>
<ftp:outbound-endpoint host="localhost" port="21" responseTimeout="10000" doc:name="FTP" connector-ref="ftp_con"
outputPattern="custom/path/myFile.txt" path="/home/ftp"
/>
</flow>
And here's the DispatcherFactory class .

import org.mule.api.MuleException;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.transport.MessageDispatcher;
import org.mule.transport.ftp.FtpMessageDispatcherFactory;
public class CustomFtpMessageDispatcherFactory extends FtpMessageDispatcherFactory
{
/** {@inheritDoc} */
public MessageDispatcher create(OutboundEndpoint endpoint) throws MuleException
{
return new CustomFtpMessageDispatcher(endpoint);
}
}
And here's the FTPMessageDispacther , which will first create a directory if the outputPattern has it .

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;
import org.mule.api.MuleEvent;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.transport.DispatchException;
import org.mule.config.i18n.CoreMessages;
import org.mule.model.streaming.CallbackOutputStream;
import org.mule.transport.ConnectException;
import org.mule.transport.ftp.FtpConnector;
import org.mule.transport.ftp.FtpMessageDispatcher;
public class CustomFtpMessageDispatcher extends FtpMessageDispatcher
{
Log logger = LogFactory.getLog(CustomFtpMessageDispatcher.class);
public CustomFtpMessageDispatcher(OutboundEndpoint endpoint)
{
super(endpoint);
}
protected void doDispatch(MuleEvent event) throws Exception
{
Object data = event.getMessage().getPayload();
String outputPattern = (String) endpoint.getProperty(FtpConnector.PROPERTY_OUTPUT_PATTERN);
String basePath = endpoint.getEndpointURI().getPath();
OutputStream out = null;
if (basePath.endsWith("/"))
basePath = basePath.substring(0, basePath.length() - 1);
if (outputPattern != null & outputPattern.contains("/"))
{
try
{
if (outputPattern.startsWith("/"))
outputPattern = outputPattern.substring(1, outputPattern.length());

String dirs[] = outputPattern.split("/", -1);
final FtpConnector connector = (FtpConnector) endpoint.getConnector();
final FTPClient client = connector.getFtp(endpoint.getEndpointURI());
for (int i = 0; i < dirs.length - 1; i++)
{
try
{
if (!dirs[i].isEmpty())
{
basePath = basePath + "/" + dirs[i];
if (!client.changeWorkingDirectory(basePath))
client.makeDirectory(basePath);
}
} catch (Exception e)
{
logger.error("Error Creating dir on ftp" + e.getMessage());
}
}

String filename = dirs[dirs.length - 1];
out = client.storeFileStream(filename);
if (out == null)
{
throw new IOException("FTP operation failed: " + client.getReplyString());
}
out = new CallbackOutputStream(out, new CallbackOutputStream.Callback()
{
public void onClose() throws Exception
{
try
{
if (!client.completePendingCommand())
{
client.logout();
client.disconnect();
throw new IOException("FTP Stream failed to complete pending request");
}
} finally
{
connector.releaseFtp(endpoint.getEndpointURI(), client);
}
}
});
}
catch (ConnectException ce)
{
// Don't wrap a ConnectException, otherwise the retry policy will not go into effect.
throw ce;
}
catch (Exception e)
{
throw new DispatchException(CoreMessages.streamingFailedNoStream(), event, (OutboundEndpoint)endpoint, e);
}
}
else
{
out = connector.getOutputStream(getEndpoint(), event);
}
try
{
if (data instanceof InputStream)
{
InputStream is = ((InputStream) data);
IOUtils.copy(is, out);
is.close();
} else
{
byte[] dataBytes;
if (data instanceof byte[])
{
dataBytes = (byte[]) data;
} else
{
dataBytes = data.toString().getBytes(event.getEncoding());
}
IOUtils.write(dataBytes, out);
}
} finally
{
out.close();
}
}
}
Post Comments And Suggestions !!!